Source code for scribe_data.cli.convert.to_sqlite

# SPDX-License-Identifier: GPL-3.0-or-later
"""
Converts all or desired JSON data generated by Scribe-Data into SQLite databases.
"""

import json
import os
import re
import sqlite3
from pathlib import Path

import questionary
from tqdm.auto import tqdm

from scribe_data.utils import (
    DEFAULT_JSON_EXPORT_DIR,
    DEFAULT_SQLITE_EXPORT_DIR,
    camel_to_snake,
    data_type_metadata,
    get_language_iso,
    language_metadata,
    list_all_languages,
)


[docs] def create_table( cursor: sqlite3.Cursor, identifier_case: str, data_type: str, cols: list[str] ) -> None: """ Create a table in the language database. Parameters ---------- cursor : sqlite3.Cursor A sqlite3 cursor. identifier_case : str Either "camel" or "snake" to determine column naming. data_type : str The name of the table to be created. cols : list of str The names of columns for the new table. """ processed_cols = [] seen_cols = set() for col in cols: col_name = camel_to_snake(col) if identifier_case == "snake" else col col_lower = col_name.lower() if col_lower in seen_cols: counter = 1 while f"{col_lower}_{counter}" in seen_cols: counter += 1 col_name = f"{col_name}_{counter}" col_lower = f"{col_lower}_{counter}" seen_cols.add(col_lower) processed_cols.append(col_name) quoted_cols = [f"[{c}]" for c in processed_cols] sql_statement = ( f"CREATE TABLE IF NOT EXISTS [{data_type}] " f"({' Text, '.join(quoted_cols)} Text, unique({quoted_cols[0]}))" ) cursor.execute(sql_statement)
[docs] def table_insert(cursor: sqlite3.Cursor, data_type: str, keys: list) -> None: """ Insert a row into a language database table. Parameters ---------- cursor : sqlite3.Cursor A sqlite3 cursor. data_type : str The name of the table to be inserted into. keys : list of any The values to be inserted into the table row. """ insert_placeholders = ", ".join(["?"] * len(keys)) sql_statement = f"INSERT OR IGNORE INTO [{data_type}] VALUES({insert_placeholders})" cursor.execute(sql_statement, keys)
[docs] def translations_to_sqlite( language_data_type_dict: dict, current_languages: list, identifier_case: str = "snake", input_file: Path = DEFAULT_JSON_EXPORT_DIR, output_file: Path = DEFAULT_SQLITE_EXPORT_DIR, overwrite: bool = False, ) -> None: """ Derive translations to create a TranslationData.sqlite file that contains a table for each language. Parameters ---------- language_data_type_dict : dict A dictionary specifying the data types for each language. current_languages : list A list of current languages. identifier_case : str, optional The identifier case. Default is "snake". input_file : str, optional, default=DEFAULT_JSON_EXPORT_DIR The input JSON export directory. output_file : str, optional, default=DEFAULT_SQLITE_EXPORT_DIR The output SQLite export directory. overwrite : bool, optional If True, existing SQLite files will be overwritten without prompting. """ maybe_over = "" translation_db_path = Path(output_file) / "TranslationData.sqlite" if translation_db_path.exists(): if not overwrite: answer = questionary.confirm( f"SQLite file {translation_db_path} already exists. Overwrite?" ).ask() if not answer: print("Skipping translation DB creation.") return os.remove(translation_db_path) maybe_over = "over" connection = sqlite3.connect(translation_db_path) cursor = connection.cursor() print(f"Database for translations {maybe_over}written and connection made.") for lang in tqdm( language_data_type_dict, desc="Tables added", unit="tables", ): print(f"Creating/Updating {lang} translations table...") json_file_path = Path(input_file) / lang / "translations.json" if not json_file_path.exists(): print( f"Skipping {lang} translations table creation as JSON file not found." ) continue with open(json_file_path, "r", encoding="utf-8") as f: json_data = json.load(f) # Define columns: lexeme_id, lastModified, word, and language columns target_cols = [get_language_iso(language) for language in current_languages] cols = ["lexeme_id", "lastModified", "word"] + target_cols create_table(cursor, identifier_case, data_type=lang, cols=cols) cursor.execute(f"DELETE FROM [{lang}]") # clear existing data for lexeme_id, entry in json_data.items(): word = next(key for key in entry.keys() if key != "lastModified") translations = entry[word] keys = [lexeme_id, entry["lastModified"], word] # Add translations for each target language. keys.extend(translations.get(target_lang) for target_lang in target_cols) table_insert(cursor, data_type=lang, keys=keys) try: connection.commit() print(f"{lang} translations table created/updated successfully.\n") except sqlite3.Error as e: print(f"Error creating/updating {lang} translations table: {e}") connection.close() print("Translations database processing completed.\n")
[docs] def wiktionary_translations_to_sqlite( language, identifier_case="snake", input_file=DEFAULT_JSON_EXPORT_DIR, output_file=DEFAULT_SQLITE_EXPORT_DIR, overwrite: bool = False, ): """ Convert Wiktionary *_translations_from_*.json files into SQLite tables. Each JSON file has the structure: { word: { word_type: { order: { description, translation } } } } The resulting SQLite table uses columns: word | word_type | order | description | translation Parameters ---------- language : str The language whose directory contains the translation JSON files. identifier_case : str, optional Either "camel" or "snake" to determine column naming. Default is "snake". input_file : str, optional, default=DEFAULT_JSON_EXPORT_DIR The input JSON export directory. output_file : str, optional, default=DEFAULT_SQLITE_EXPORT_DIR The output SQLite export directory. overwrite : bool, optional If True, existing SQLite files will be overwritten without prompting. """ lang_dir = Path(input_file) / "/".join(reversed(language.split())) if not lang_dir.is_dir(): print( f"Warning: Directory '{lang_dir}' does not exist. " f"Skipping Wiktionary translations for '{language}'." ) return # Find all *_translations_from_*.json files. translation_files = sorted( f for f in lang_dir.iterdir() if f.suffix == ".json" and re.match(r".+_translations_from_.+\.json$", f.name) ) if not translation_files: return db_path = Path(output_file) / "TranslationData.sqlite" db_path.parent.mkdir(parents=True, exist_ok=True) connection = sqlite3.connect(db_path) cursor = connection.cursor() for json_path in translation_files: # Derive the table name from the filename (e.g. "ar_translations_from_en"). table_name = json_path.stem print(f"Creating/Updating {language} {table_name} table...") if json_path.stat().st_size == 0: print(f"Warning: {json_path.name} is empty. Skipping.") continue with open(json_path, "r", encoding="utf-8") as f: json_data = json.load(f) # Define columns. cols = ["word", "word_type", "word_order", "description", "translation"] if identifier_case == "camel": cols = ["word", "wordType", "wordOrder", "description", "translation"] # Create table (unique on word + word_type + order combination). col_defs = ", ".join(f"{c} Text" for c in cols) cursor.execute( f"CREATE TABLE IF NOT EXISTS [{table_name}] " f"({col_defs}, UNIQUE(word, {cols[1]}, {cols[2]}))" ) cursor.execute(f"DELETE FROM [{table_name}]") # clear existing data # Flatten the nested JSON and insert rows. insert_sql = ( f"INSERT OR IGNORE INTO [{table_name}] " f"VALUES ({', '.join(['?'] * len(cols))})" ) for word, word_types in json_data.items(): for word_type, senses in word_types.items(): for order, entry in senses.items(): row = ( word, word_type, order, entry.get("description", ""), entry.get("translation", ""), ) cursor.execute(insert_sql, row) try: connection.commit() print(f"{table_name} table created/updated successfully.") except sqlite3.Error as e: print(f"Error creating/updating {table_name} table: {e}") connection.close() print(f"Wiktionary translation tables for {language} processed successfully.\n")
[docs] def convert_to_sqlite( languages: list[str] | None = None, specific_tables: str | list[str] | None = None, identifier_case: str = "camel", input_file: Path = DEFAULT_JSON_EXPORT_DIR, output_file: Path = DEFAULT_SQLITE_EXPORT_DIR, overwrite: bool = False, ) -> None: """ Create SQLite databases from JSON data. Parameters ---------- languages : Optional[List[str]] The languages to process. If None, use all available languages. specific_tables : Optional[Union[str, List[str]]] The specific tables to process. If None, process all tables. identifier_case : str, optional, default='camel' Format of the identifiers ("camel" or "snake"). Defaults to "camel". input_file : str, optional, default=DEFAULT_JSON_EXPORT_DIR The input JSON export directory. output_file : str, optional, default=DEFAULT_SQLITE_EXPORT_DIR The output SQLite export directory. overwrite : bool, optional If set to True, existing SQLite files will be overwritten without prompting. """ specific_tables = ( [specific_tables] if specific_tables and isinstance(specific_tables, str) else specific_tables ) # Ensure the SQLite export directory exists before creating the database. sqlite_export_dir = Path(output_file) sqlite_export_dir.mkdir(parents=True, exist_ok=True) current_language_data = language_metadata data_types = data_type_metadata current_languages = list_all_languages(current_language_data) if not languages: languages = current_languages elif isinstance(languages, str): languages = [languages.lower()] elif isinstance(languages, list): languages = [lang.lower() for lang in languages] if not set(languages).issubset(current_languages): raise ValueError( f"Invalid language(s) specified. Available languages are: {', '.join(current_languages)}" ) # Prepare data types to process. language_data_type_dict = {} for lang in languages: lang_dir = Path(input_file) / "/".join(reversed(lang.split())) if lang_dir.is_dir(): language_data_type_dict[lang] = [ f.split(".json")[0] for f in os.listdir(lang_dir) if f.split(".json")[0] in (specific_tables or data_types) ] else: print( f"Warning: Directory '{lang_dir}' does not exist. Skipping language '{lang}'." ) language_data_type_dict[lang] = [] # Check if there's any data to process. has_data = any(language_data_type_dict.values()) has_wiktionary_translations = ( specific_tables is None or "wiktionary_translations" in specific_tables ) if not has_data and not has_wiktionary_translations: print( "No data found for any of the specified languages. No SQLite databases will be created." ) return # Process translations if no specific_tables argument was provided # or if "translations" is contained in the specific_tables list. if specific_tables is None or "translations" in specific_tables: translations_to_sqlite( language_data_type_dict, current_languages, identifier_case=identifier_case, input_file=input_file, output_file=output_file, overwrite=overwrite, ) # Remove "translations" from each language's list so we don't create extra language-specific DBs for lang in language_data_type_dict: language_data_type_dict[lang] = [ dt for dt in language_data_type_dict[lang] if dt != "translations" ] # Process Wiktionary *_translations_from_*.json files. if specific_tables is None or "wiktionary_translations" in specific_tables: for lang in languages: wiktionary_translations_to_sqlite( language=lang, identifier_case=identifier_case, input_file=input_file, output_file=output_file, overwrite=overwrite, ) if specific_tables and set(specific_tables).issubset( {"translations", "wiktionary_translations"} ): return if specific_tables and "autocomplete_lexicon" in specific_tables: for lang in language_data_type_dict: if "autocomplete_lexicon" not in language_data_type_dict[lang]: language_data_type_dict[lang].append("autocomplete_lexicon") languages_capitalized = [lang.capitalize() for lang in languages] print( f"Creating/Updating SQLite databases for the following languages: {', '.join(languages_capitalized)}" ) if specific_tables: print(f"Updating only the following tables: {', '.join(specific_tables)}") for lang in tqdm( language_data_type_dict, desc="Databases created", unit="dbs", ): if language_data_type_dict[lang] != []: maybe_over = "" db_file = ( Path(output_file) / f"{get_language_iso(lang).upper()}LanguageData.sqlite" ) if db_file.exists(): if not overwrite: answer = questionary.confirm( f"SQLite file {db_file} already exists.\nDo you want to overwrite it?" ).ask() if not answer: print( f"Skipping {lang} database creation/update as user chose not to overwrite." ) continue os.remove(db_file) maybe_over = "over" connection = sqlite3.connect(db_file) cursor = connection.cursor() print(f"Database for {lang} {maybe_over}written and connection made.") for dt in language_data_type_dict[lang]: print(f"Creating/Updating {lang} {dt} table...") json_file_path = Path(input_file) / lang / f"{dt}.json" if not json_file_path.exists(): print( f"Skipping {lang} {dt} table creation as JSON file not found." ) continue with open(json_file_path, "r", encoding="utf-8") as f: json_data = json.load(f) if dt in [ key for key in data_type_metadata.keys() if key not in ["translations", "emoji_keywords"] ]: cols = ["wdLexemeId"] all_elem_keys = [ json_data[k].keys() for k in list(json_data.keys()) ] all_keys_flat = list({k for ks in all_elem_keys for k in ks}) cols += all_keys_flat create_table(cursor, identifier_case, data_type=dt, cols=cols) cursor.execute(f"DELETE FROM [{dt}]") # clear existing data for row in json_data: keys = [row] keys += [ json_data[row][col_name] if col_name in json_data[row] else None for col_name in cols[1:] ] table_insert(cursor, data_type=dt, keys=keys) if dt == "nouns" and lang != "Russian": table_insert( cursor, data_type=dt, keys=["L0", "Scribe"] + [""] * (len(cols) - 2), ) connection.commit() elif dt in ["emoji_keywords"]: cols = ["word"] + [f"{dt[:-1]}_{i}" for i in range(3)] create_table(cursor, identifier_case, data_type=dt, cols=cols) cursor.execute(f"DELETE FROM [{dt}]") # clear existing data for row in json_data: keys = [row] keys += [ json_data[row][i]["emoji"] for i in range(min(len(json_data[row]), len(cols) - 1)) ] keys += [""] * (len(cols) - len(keys)) table_insert(cursor, data_type=dt, keys=keys) connection.commit() connection.close() print(f"{lang.capitalize()} database processing completed.") else: print( f"Skipping {lang.capitalize()} database creation/update as no related JSON data files were found." ) print("Database creation/update process completed.\n")