# SPDX-License-Identifier: GPL-3.0-or-later
"""
Converts all or desired JSON data generated by Scribe-Data into SQLite databases.
"""
import json
import os
import re
import sqlite3
from pathlib import Path
import questionary
from tqdm.auto import tqdm
from scribe_data.utils import (
DEFAULT_JSON_EXPORT_DIR,
DEFAULT_SQLITE_EXPORT_DIR,
camel_to_snake,
data_type_metadata,
get_language_iso,
language_metadata,
list_all_languages,
)
[docs]
def create_table(
cursor: sqlite3.Cursor, identifier_case: str, data_type: str, cols: list[str]
) -> None:
"""
Create a table in the language database.
Parameters
----------
cursor : sqlite3.Cursor
A sqlite3 cursor.
identifier_case : str
Either "camel" or "snake" to determine column naming.
data_type : str
The name of the table to be created.
cols : list of str
The names of columns for the new table.
"""
processed_cols = []
seen_cols = set()
for col in cols:
col_name = camel_to_snake(col) if identifier_case == "snake" else col
col_lower = col_name.lower()
if col_lower in seen_cols:
counter = 1
while f"{col_lower}_{counter}" in seen_cols:
counter += 1
col_name = f"{col_name}_{counter}"
col_lower = f"{col_lower}_{counter}"
seen_cols.add(col_lower)
processed_cols.append(col_name)
quoted_cols = [f"[{c}]" for c in processed_cols]
sql_statement = (
f"CREATE TABLE IF NOT EXISTS [{data_type}] "
f"({' Text, '.join(quoted_cols)} Text, unique({quoted_cols[0]}))"
)
cursor.execute(sql_statement)
[docs]
def table_insert(cursor: sqlite3.Cursor, data_type: str, keys: list) -> None:
"""
Insert a row into a language database table.
Parameters
----------
cursor : sqlite3.Cursor
A sqlite3 cursor.
data_type : str
The name of the table to be inserted into.
keys : list of any
The values to be inserted into the table row.
"""
insert_placeholders = ", ".join(["?"] * len(keys))
sql_statement = f"INSERT OR IGNORE INTO [{data_type}] VALUES({insert_placeholders})"
cursor.execute(sql_statement, keys)
[docs]
def translations_to_sqlite(
language_data_type_dict: dict,
current_languages: list,
identifier_case: str = "snake",
input_file: Path = DEFAULT_JSON_EXPORT_DIR,
output_file: Path = DEFAULT_SQLITE_EXPORT_DIR,
overwrite: bool = False,
) -> None:
"""
Derive translations to create a TranslationData.sqlite file that contains a table for each language.
Parameters
----------
language_data_type_dict : dict
A dictionary specifying the data types for each language.
current_languages : list
A list of current languages.
identifier_case : str, optional
The identifier case. Default is "snake".
input_file : str, optional, default=DEFAULT_JSON_EXPORT_DIR
The input JSON export directory.
output_file : str, optional, default=DEFAULT_SQLITE_EXPORT_DIR
The output SQLite export directory.
overwrite : bool, optional
If True, existing SQLite files will be overwritten without prompting.
"""
maybe_over = ""
translation_db_path = Path(output_file) / "TranslationData.sqlite"
if translation_db_path.exists():
if not overwrite:
answer = questionary.confirm(
f"SQLite file {translation_db_path} already exists. Overwrite?"
).ask()
if not answer:
print("Skipping translation DB creation.")
return
os.remove(translation_db_path)
maybe_over = "over"
connection = sqlite3.connect(translation_db_path)
cursor = connection.cursor()
print(f"Database for translations {maybe_over}written and connection made.")
for lang in tqdm(
language_data_type_dict,
desc="Tables added",
unit="tables",
):
print(f"Creating/Updating {lang} translations table...")
json_file_path = Path(input_file) / lang / "translations.json"
if not json_file_path.exists():
print(
f"Skipping {lang} translations table creation as JSON file not found."
)
continue
with open(json_file_path, "r", encoding="utf-8") as f:
json_data = json.load(f)
# Define columns: lexeme_id, lastModified, word, and language columns
target_cols = [get_language_iso(language) for language in current_languages]
cols = ["lexeme_id", "lastModified", "word"] + target_cols
create_table(cursor, identifier_case, data_type=lang, cols=cols)
cursor.execute(f"DELETE FROM [{lang}]") # clear existing data
for lexeme_id, entry in json_data.items():
word = next(key for key in entry.keys() if key != "lastModified")
translations = entry[word]
keys = [lexeme_id, entry["lastModified"], word]
# Add translations for each target language.
keys.extend(translations.get(target_lang) for target_lang in target_cols)
table_insert(cursor, data_type=lang, keys=keys)
try:
connection.commit()
print(f"{lang} translations table created/updated successfully.\n")
except sqlite3.Error as e:
print(f"Error creating/updating {lang} translations table: {e}")
connection.close()
print("Translations database processing completed.\n")
[docs]
def wiktionary_translations_to_sqlite(
language,
identifier_case="snake",
input_file=DEFAULT_JSON_EXPORT_DIR,
output_file=DEFAULT_SQLITE_EXPORT_DIR,
overwrite: bool = False,
):
"""
Convert Wiktionary *_translations_from_*.json files into SQLite tables.
Each JSON file has the structure:
{ word: { word_type: { order: { description, translation } } } }
The resulting SQLite table uses columns:
word | word_type | order | description | translation
Parameters
----------
language : str
The language whose directory contains the translation JSON files.
identifier_case : str, optional
Either "camel" or "snake" to determine column naming. Default is "snake".
input_file : str, optional, default=DEFAULT_JSON_EXPORT_DIR
The input JSON export directory.
output_file : str, optional, default=DEFAULT_SQLITE_EXPORT_DIR
The output SQLite export directory.
overwrite : bool, optional
If True, existing SQLite files will be overwritten without prompting.
"""
lang_dir = Path(input_file) / "/".join(reversed(language.split()))
if not lang_dir.is_dir():
print(
f"Warning: Directory '{lang_dir}' does not exist. "
f"Skipping Wiktionary translations for '{language}'."
)
return
# Find all *_translations_from_*.json files.
translation_files = sorted(
f
for f in lang_dir.iterdir()
if f.suffix == ".json" and re.match(r".+_translations_from_.+\.json$", f.name)
)
if not translation_files:
return
db_path = Path(output_file) / "TranslationData.sqlite"
db_path.parent.mkdir(parents=True, exist_ok=True)
connection = sqlite3.connect(db_path)
cursor = connection.cursor()
for json_path in translation_files:
# Derive the table name from the filename (e.g. "ar_translations_from_en").
table_name = json_path.stem
print(f"Creating/Updating {language} {table_name} table...")
if json_path.stat().st_size == 0:
print(f"Warning: {json_path.name} is empty. Skipping.")
continue
with open(json_path, "r", encoding="utf-8") as f:
json_data = json.load(f)
# Define columns.
cols = ["word", "word_type", "word_order", "description", "translation"]
if identifier_case == "camel":
cols = ["word", "wordType", "wordOrder", "description", "translation"]
# Create table (unique on word + word_type + order combination).
col_defs = ", ".join(f"{c} Text" for c in cols)
cursor.execute(
f"CREATE TABLE IF NOT EXISTS [{table_name}] "
f"({col_defs}, UNIQUE(word, {cols[1]}, {cols[2]}))"
)
cursor.execute(f"DELETE FROM [{table_name}]") # clear existing data
# Flatten the nested JSON and insert rows.
insert_sql = (
f"INSERT OR IGNORE INTO [{table_name}] "
f"VALUES ({', '.join(['?'] * len(cols))})"
)
for word, word_types in json_data.items():
for word_type, senses in word_types.items():
for order, entry in senses.items():
row = (
word,
word_type,
order,
entry.get("description", ""),
entry.get("translation", ""),
)
cursor.execute(insert_sql, row)
try:
connection.commit()
print(f"{table_name} table created/updated successfully.")
except sqlite3.Error as e:
print(f"Error creating/updating {table_name} table: {e}")
connection.close()
print(f"Wiktionary translation tables for {language} processed successfully.\n")
[docs]
def convert_to_sqlite(
languages: list[str] | None = None,
specific_tables: str | list[str] | None = None,
identifier_case: str = "camel",
input_file: Path = DEFAULT_JSON_EXPORT_DIR,
output_file: Path = DEFAULT_SQLITE_EXPORT_DIR,
overwrite: bool = False,
) -> None:
"""
Create SQLite databases from JSON data.
Parameters
----------
languages : Optional[List[str]]
The languages to process. If None, use all available languages.
specific_tables : Optional[Union[str, List[str]]]
The specific tables to process. If None, process all tables.
identifier_case : str, optional, default='camel'
Format of the identifiers ("camel" or "snake"). Defaults to "camel".
input_file : str, optional, default=DEFAULT_JSON_EXPORT_DIR
The input JSON export directory.
output_file : str, optional, default=DEFAULT_SQLITE_EXPORT_DIR
The output SQLite export directory.
overwrite : bool, optional
If set to True, existing SQLite files will be overwritten without prompting.
"""
specific_tables = (
[specific_tables]
if specific_tables and isinstance(specific_tables, str)
else specific_tables
)
# Ensure the SQLite export directory exists before creating the database.
sqlite_export_dir = Path(output_file)
sqlite_export_dir.mkdir(parents=True, exist_ok=True)
current_language_data = language_metadata
data_types = data_type_metadata
current_languages = list_all_languages(current_language_data)
if not languages:
languages = current_languages
elif isinstance(languages, str):
languages = [languages.lower()]
elif isinstance(languages, list):
languages = [lang.lower() for lang in languages]
if not set(languages).issubset(current_languages):
raise ValueError(
f"Invalid language(s) specified. Available languages are: {', '.join(current_languages)}"
)
# Prepare data types to process.
language_data_type_dict = {}
for lang in languages:
lang_dir = Path(input_file) / "/".join(reversed(lang.split()))
if lang_dir.is_dir():
language_data_type_dict[lang] = [
f.split(".json")[0]
for f in os.listdir(lang_dir)
if f.split(".json")[0] in (specific_tables or data_types)
]
else:
print(
f"Warning: Directory '{lang_dir}' does not exist. Skipping language '{lang}'."
)
language_data_type_dict[lang] = []
# Check if there's any data to process.
has_data = any(language_data_type_dict.values())
has_wiktionary_translations = (
specific_tables is None or "wiktionary_translations" in specific_tables
)
if not has_data and not has_wiktionary_translations:
print(
"No data found for any of the specified languages. No SQLite databases will be created."
)
return
# Process translations if no specific_tables argument was provided
# or if "translations" is contained in the specific_tables list.
if specific_tables is None or "translations" in specific_tables:
translations_to_sqlite(
language_data_type_dict,
current_languages,
identifier_case=identifier_case,
input_file=input_file,
output_file=output_file,
overwrite=overwrite,
)
# Remove "translations" from each language's list so we don't create extra language-specific DBs
for lang in language_data_type_dict:
language_data_type_dict[lang] = [
dt for dt in language_data_type_dict[lang] if dt != "translations"
]
# Process Wiktionary *_translations_from_*.json files.
if specific_tables is None or "wiktionary_translations" in specific_tables:
for lang in languages:
wiktionary_translations_to_sqlite(
language=lang,
identifier_case=identifier_case,
input_file=input_file,
output_file=output_file,
overwrite=overwrite,
)
if specific_tables and set(specific_tables).issubset(
{"translations", "wiktionary_translations"}
):
return
if specific_tables and "autocomplete_lexicon" in specific_tables:
for lang in language_data_type_dict:
if "autocomplete_lexicon" not in language_data_type_dict[lang]:
language_data_type_dict[lang].append("autocomplete_lexicon")
languages_capitalized = [lang.capitalize() for lang in languages]
print(
f"Creating/Updating SQLite databases for the following languages: {', '.join(languages_capitalized)}"
)
if specific_tables:
print(f"Updating only the following tables: {', '.join(specific_tables)}")
for lang in tqdm(
language_data_type_dict,
desc="Databases created",
unit="dbs",
):
if language_data_type_dict[lang] != []:
maybe_over = ""
db_file = (
Path(output_file)
/ f"{get_language_iso(lang).upper()}LanguageData.sqlite"
)
if db_file.exists():
if not overwrite:
answer = questionary.confirm(
f"SQLite file {db_file} already exists.\nDo you want to overwrite it?"
).ask()
if not answer:
print(
f"Skipping {lang} database creation/update as user chose not to overwrite."
)
continue
os.remove(db_file)
maybe_over = "over"
connection = sqlite3.connect(db_file)
cursor = connection.cursor()
print(f"Database for {lang} {maybe_over}written and connection made.")
for dt in language_data_type_dict[lang]:
print(f"Creating/Updating {lang} {dt} table...")
json_file_path = Path(input_file) / lang / f"{dt}.json"
if not json_file_path.exists():
print(
f"Skipping {lang} {dt} table creation as JSON file not found."
)
continue
with open(json_file_path, "r", encoding="utf-8") as f:
json_data = json.load(f)
if dt in [
key
for key in data_type_metadata.keys()
if key not in ["translations", "emoji_keywords"]
]:
cols = ["wdLexemeId"]
all_elem_keys = [
json_data[k].keys() for k in list(json_data.keys())
]
all_keys_flat = list({k for ks in all_elem_keys for k in ks})
cols += all_keys_flat
create_table(cursor, identifier_case, data_type=dt, cols=cols)
cursor.execute(f"DELETE FROM [{dt}]") # clear existing data
for row in json_data:
keys = [row]
keys += [
json_data[row][col_name]
if col_name in json_data[row]
else None
for col_name in cols[1:]
]
table_insert(cursor, data_type=dt, keys=keys)
if dt == "nouns" and lang != "Russian":
table_insert(
cursor,
data_type=dt,
keys=["L0", "Scribe"] + [""] * (len(cols) - 2),
)
connection.commit()
elif dt in ["emoji_keywords"]:
cols = ["word"] + [f"{dt[:-1]}_{i}" for i in range(3)]
create_table(cursor, identifier_case, data_type=dt, cols=cols)
cursor.execute(f"DELETE FROM [{dt}]") # clear existing data
for row in json_data:
keys = [row]
keys += [
json_data[row][i]["emoji"]
for i in range(min(len(json_data[row]), len(cols) - 1))
]
keys += [""] * (len(cols) - len(keys))
table_insert(cursor, data_type=dt, keys=keys)
connection.commit()
connection.close()
print(f"{lang.capitalize()} database processing completed.")
else:
print(
f"Skipping {lang.capitalize()} database creation/update as no related JSON data files were found."
)
print("Database creation/update process completed.\n")