Files
aigrammar/tools/puzzle.py
2025-07-21 18:25:17 +08:00

273 lines
8.7 KiB
Python

'''
词库来自: https://diginoodles.com/projects/eowl
'''
import os
import json
import random
import time
import logging
import argparse
from collections import defaultdict
from pathlib import Path
from openai import AzureOpenAI
endpoint = "https://grammar.openai.azure.com/"
model_name = "gpt-4o"
deployment = "gpt4"
subscription_key = "8b68c235b737488ab9a99983a14f8cca"
api_version = "2024-12-01-preview"
client = AzureOpenAI(
api_version=api_version,
azure_endpoint=endpoint,
api_key=subscription_key,
)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s: %(message)s",
handlers=[logging.StreamHandler()]
)
BASE_DIR = './data'
WORDS_DIR = f"{BASE_DIR}/EOWL-v1.1.2/LF Delimited Format"
RESULT_DIR = f"{BASE_DIR}/result"
os.makedirs(RESULT_DIR, exist_ok=True)
TEMP_FILE = f"{BASE_DIR}/temp_words.txt"
batch_words_size = 100
def find_words_files(folder):
txt_files = []
for f in Path(folder).glob("*.txt"):
if "Words" in f.name:
txt_files.append(f)
return txt_files
def collect_words(files):
words_set = set()
for file in files:
with open(file, 'r', encoding='utf-8') as f:
for line in f:
word = line.strip()
if len(word) >= 3:
words_set.add(word)
return list(words_set)
def write_temp(words):
with open(TEMP_FILE, 'w', encoding='utf-8') as f:
for word in words:
f.write(word + '\n')
def read_batches(batch_size=batch_words_size):
with open(TEMP_FILE, 'r', encoding='utf-8') as f:
words = [line.strip() for line in f if line.strip()]
for i in range(0, len(words), batch_size):
yield words[i:i+batch_size]
'''Please respond with pure JSON only, without any formatting or explanations.'''
def build_prompt(words):
word_list = ", ".join(words)
prompt = f"""
Please analyze the following list of English words and do the following:
1. Classify each word into a theme (like Animals, Plants, Materials, Body Parts, Clothes & Accessories, Food & Drinks, Places, Transportation, Sports, Colors, Numbers, Emotions, Tools, People & Occupations, etc.).
2. Identify the part of speech of each word (verb, noun, adjective, etc.).
3. Mark the frequency of usage of each word in everyday English as High, Medium, or Low.
4. Identify words with the same word root and group them.
For each word, return a JSON array where each item is an object with these keys:
- w: the word
- t: theme (like Animals, Tools, etc.)
- p: part of speech (noun, verb, etc.)
- f: frequency (Low/Medium/High)
- s: same root group (array of words with the same root)
Respond with PURE JSON ONLY, without markdown or explanations.
Here are the words:
{word_list}
"""
return prompt
def call_openai_with_retry(prompt, retries=3, delay=5):
for attempt in range(retries):
try:
response = client.chat.completions.create(
messages=[
{"role": "system", "content": "You are an expert English linguist and lexicographer."},
{"role": "user", "content": prompt}
],
max_tokens=16000,
temperature=0.7,
top_p=1.0,
model=deployment
)
#return response.choices[0].message.content.strip()
text = response.choices[0].message.content.strip()
# 如果还有 ```json 开头的,去掉
if text.startswith("```json"):
text = text[7:-3].strip()
return text
except Exception as e:
logging.warning(f"OpenAI request failed (attempt {attempt+1}): {e}")
time.sleep(delay)
logging.error("OpenAI request failed after all retries.")
return None
def save_result(index, req, resp, is_json):
matched = True if is_json and len(req) == len(resp) else False
flag = "json" if is_json else "txt"
match_str = "matched" if matched else 'notmatch'
filename = f"{RESULT_DIR}/{str(index).zfill(5)}_{match_str}_{flag}.json"
data = {
'req_len': len(req),
'rsp_len': len(resp) if is_json else 0,
'match':matched,
'req': req,
'rsp': resp
}
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
logging.info(f"Saved result to {filename}")
def process_folder(folder):
files = find_words_files(folder)
logging.info(f"Found {len(files)} files to process.")
words = collect_words(files)
logging.info(f"Collected {len(words)} unique words.")
write_temp(words)
for idx, batch in enumerate(read_batches(), 1):
logging.info(f"Processing batch {idx} with {len(batch)} words")
prompt = build_prompt(batch)
resp_text = call_openai_with_retry(prompt)
if resp_text is None:
save_result(idx, batch, "Failed to get response", False)
continue
try:
resp_json = json.loads(resp_text)
save_result(idx, batch, resp_json, True)
except json.JSONDecodeError:
logging.warning(f"Batch {idx} response is not valid JSON.")
save_result(idx, batch, resp_text, False)
time.sleep(2) # 每批之间暂停
# redo逻辑
def redo_results():
files = sorted(Path(RESULT_DIR).glob('*.json'))
for f in files:
if 'matched' in f.name:
continue
logging.info(f"Redoing {f}")
try:
with open(f, 'r', encoding='utf-8') as fp:
data = json.load(fp)
words = data.get("req")
if not words:
logging.warning(f"No req in {f}")
continue
prompt = build_prompt(words)
resp_text = call_openai_with_retry(prompt)
if resp_text is None:
logging.warning(f"Failed to get response: {f}")
continue
try:
resp_json = json.loads(resp_text)
if len(words) == len(resp_json):
logging.info(f"get correct response. rewrite file. {f}")
f.unlink()
save_result(int(f.name[:5]), words, resp_json, True)
else:
logging.warning(f"response not complete: {f}, req len: {len(words)}, rsp len: {len(resp_json)}")
except json.JSONDecodeError:
logging.warning(f"response is not valid JSON: {f}")
time.sleep(2) # 每批之间暂停
except Exception as e:
logging.error(f"Error processing {f}: {e}")
# 检测是否无重复字母
def has_no_repeated_letters(word):
return len(set(word)) == len(word)
def generate_wordlist():
"""
从 RESULT_DIR 下的 matched 文件中提取无重复字母的单词,并按 f 分类写入 words_{f}.txt
"""
word_map = defaultdict(list)
all_words = set()
# 优化写法:先筛选再排序
matched_files = []
for file in os.scandir(RESULT_DIR):
# 同上的过滤条件
if (file.is_file()
and file.name.endswith('.json')
and 'matched' in file.name
and len(file.name) >= 5
and file.name[:5].isdigit()):
matched_files.append(file)
for file in sorted(matched_files, key=lambda f: int(f.name[:5])):
if 'matched' not in file.name:
continue
with open(file.path, 'r', encoding='utf-8') as f:
data = json.load(f)
rsp = data.get('rsp', [])
for item in rsp:
word = item.get('w')
freq = item.get('f')
if word and freq and has_no_repeated_letters(word):
word_map[freq].append(word)
all_words.add(word)
# 写入文件
for freq, words in word_map.items():
filename = os.path.join(RESULT_DIR, f'words_{freq}.txt')
with open(filename, 'w', encoding='utf-8') as f:
for word in words:
f.write(word + '\n')
logging.info(f'✅ 写入完成: {filename} ({len(words)} 个单词)')
# 写全量
filename = os.path.join(RESULT_DIR, 'wordlist.txt')
with open(filename, 'w', encoding='utf-8') as f:
for word in all_words:
f.write(word + '\n')
logging.info(f'✅ 写入完成: {filename} ({len(all_words)} 个单词)')
def main():
parser = argparse.ArgumentParser()
parser.add_argument('cmd', help='执行的命令: init / redo / gen')
args = parser.parse_args()
if args.cmd == 'init':
process_folder(WORDS_DIR)
elif args.cmd == 'redo':
redo_results()
elif args.cmd == 'gen':
generate_wordlist()
else:
print("❌ 未知命令,请使用: all / redo / gen")
if __name__ == '__main__':
main()