Source code for scribe_data.check.check_missing_forms.generate_query

# SPDX-License-Identifier: GPL-3.0-or-later
"""
Generate SPARQL queries for missing lexeme forms.
"""

import os
import re
from pathlib import Path

from scribe_data.check.check_missing_forms.normalize_forms import sort_qids_by_position
from scribe_data.utils import (
    LANGUAGE_DATA_EXTRACTION_DIR as language_data_extraction,
)
from scribe_data.utils import (
    data_type_metadata,
    language_metadata,
    lexeme_form_metadata,
    sub_languages,
)


[docs] def get_available_filename(base_path): """ Find the next available filename by incrementing counter if file exists. Parameters ---------- base_path : str Base path for the query file. Returns ------- str Available filename that doesn't conflict with existing files. Examples -------- If no files exist: - Returns query_{data_type}.sparql If query_{data_type}.sparql exists: - Renames existing query_{data_type}.sparql to query_{data_type}_1.sparql - Returns query_{data_type}_2.sparql If last file is query_{data_type}_N.sparql: - Returns query_{data_type}_(N+1).sparql """ base_dir = os.path.dirname(base_path) base_name = os.path.basename(base_path) name, ext = os.path.splitext(base_name) # ext : ".sparql" # If directory doesn't exist, return base name. if not os.path.exists(base_dir): return base_path # Check for existing files. existing_files = [ f for f in os.listdir(base_dir) if f.startswith(name) and f.endswith(ext) ] # If no files exist, use base name. if not existing_files: return base_path # Check if base file exists (query_{data_type}.sparql). if base_name in existing_files: # Rename base file to query_{data_type}_1.sparql. old_path = os.path.join(base_dir, base_name) new_path = os.path.join(base_dir, f"{name}_1{ext}") os.rename(old_path, new_path) # Return query_{data_type}_2.sparql for new file. return os.path.join(base_dir, f"{name}_2{ext}") # Find highest number in existing files. max_num = 0 for f in existing_files: if match := re.search(rf"{name}_(\d+){ext}$", f): num = int(match[1]) max_num = max(max_num, num) # Return next number in sequence. return os.path.join(base_dir, f"{name}_{max_num + 1}{ext}")
[docs] def generate_query(missing_features, query_dir=None, sub_lang_iso_code=None): """ Generate SPARQL queries for missing lexeme forms. Parameters ---------- missing_features : dict Dictionary containing missing features by language and data type. Format: {language_qid: {data_type_qid: [[form_qids]]}}. query_dir : str or Path, optional Directory where query files should be saved. If None, uses default language_data_extraction directory. sub_lang_iso_code : str The ISO-2 code of a sub-language if there is one being provided. Returns ------- str Path to the generated query file. Notes ----- - Generates a single query file combining all forms for a given language and data type combination. - Query files are named incrementally if duplicates exist. - Creates necessary directories if they don't exist. """ language_qid = next(iter(missing_features.keys())) data_type_qid = next(iter(missing_features[language_qid].keys())) # Find the language entry by QID. language_entry = None for name, data in language_metadata.items(): if data.get("qid") == language_qid: language_entry = (name, data) break # Check sub-languages if main language not found if "sub_languages" in data: for sub_name, sub_data in data["sub_languages"].items(): if sub_data.get("qid") == language_qid: # Use main language name instead of sub_name. language_entry = ( name, sub_data, ) break if language_entry is None: raise ValueError(f"Language with QID {language_qid} not found in metadata") language = language_entry[0] # the language name data_type = next( name for name, qid in data_type_metadata.items() if qid == data_type_qid ) # Create a QID to label mapping from the metadata. qid_to_label = {} for category in lexeme_form_metadata.values(): for item in category.values(): qid_to_label[item["qid"]] = item["label"] # Process all forms at once. forms_query = [] all_form_combinations = sort_qids_by_position( nested_qids=missing_features[language_qid][data_type_qid] ) # Keep track of used labels to avoid duplicates used_labels = set() for form_qids in all_form_combinations: # Convert QIDs to labels and join them together. labels = [qid_to_label.get(qid, qid) for qid in form_qids] concatenated_label = "".join(labels) # Make first letter lowercase. concatenated_label = concatenated_label[0].lower() + concatenated_label[1:] # Only add if this label hasn't been used before if concatenated_label not in used_labels: forms_query.append({"label": concatenated_label, "qids": form_qids}) used_labels.add(concatenated_label) body_data_type = data_type.replace("_", "")[:-1] # nouns: noun, s # Generate a single query for all forms. main_body = f"""# tool: scribe-data # All {language.capitalize()} ({language_qid}) {data_type} ({data_type_qid}) and the given forms. # Enter this query at https://query.wikidata.org/. SELECT (REPLACE(STR(?lexeme), "http://www.wikidata.org/entity/", "") AS ?lexemeID) ?lastModified ?{body_data_type} """ + "\n ".join(f'?{form["label"]}' for form in forms_query) where_clause = f""" WHERE {{ ?lexeme dct:language wd:{language_qid} ; wikibase:lexicalCategory wd:{data_type_qid} ; wikibase:lemma ?{body_data_type} ; schema:dateModified ?lastModified . """ if sub_lang_iso_code: try: for data_type_qid in sub_languages[language]: if data_type_qid == sub_lang_iso_code: sub_lang_name = sub_languages[language][sub_lang_iso_code]["name"] break except (KeyError, TypeError): print( f"Warning: Could not find sub-language data for {language} - {sub_lang_iso_code}" ) return None where_clause += f""" # Note: We need to filter for {sub_lang_iso_code} to remove {sub_lang_name} ({sub_lang_iso_code}) words. FILTER(lang(?{body_data_type}) = "{sub_lang_iso_code}") """ # Generate OPTIONAL clauses for all forms in one query. optional_clauses = "" for form in forms_query: qids = ", ".join(f"wd:{qid}" for qid in form["qids"]) optional_clauses += f""" OPTIONAL {{ ?lexeme ontolex:lexicalForm ?{form['label']}Form . ?{form['label']}Form ontolex:representation ?{form['label']} ; wikibase:grammaticalFeature {qids} . }} """ # Print the complete query. final_query = main_body + where_clause + optional_clauses + "}\n" # Create base filename. if sub_lang_iso_code: base_file_name = f"{query_dir}/{language}/{sub_lang_name}/{data_type}/query_{data_type}.sparql" elif query_dir: base_file_name = ( Path(query_dir) / language / data_type / f"query_{data_type}.sparql" ) else: base_file_name = f"{language_data_extraction}/{language}/{data_type}/query_{data_type}.sparql" # Get the next available filename. file_name = get_available_filename(str(base_file_name)) # Create directory if it doesn't exist. os.makedirs(os.path.dirname(file_name), exist_ok=True) # Write query to file. with open(file_name, "w") as file: file.write(final_query) print(f"Query file created: {file_name}") return file_name