kshama/web_search_helper.py
2025-06-29 20:49:04 +06:00

90 lines
3.1 KiB
Python

import requests
import faiss
import pickle
import os
from bs4 import BeautifulSoup
from urllib.parse import quote
from sentence_transformers import SentenceTransformer
from urllib.parse import parse_qs, urlparse, unquote
class WebSearchHelper:
def __init__(self, kb_path="web_kb.index", meta_path="web_kb_meta.pkl"):
self.embedder = SentenceTransformer("all-MiniLM-L6-v2")
self.kb_path = kb_path
self.meta_path = meta_path
self.meta = []
self.index = None
self._load_index()
def _load_index(self):
if os.path.exists(self.kb_path):
self.index = faiss.read_index(self.kb_path)
with open(self.meta_path, "rb") as f:
self.meta = pickle.load(f)
else:
self.index = faiss.IndexFlatL2(384)
def _save_index(self):
faiss.write_index(self.index, self.kb_path)
with open(self.meta_path, "wb") as f:
pickle.dump(self.meta, f)
def search_duckduckgo(self, query, num=5):
results = []
q = quote(query)
headers = {'User-Agent': 'Mozilla/5.0'}
url = f"https://lite.duckduckgo.com/lite?q={q}"
print(url)
res = requests.get(url, headers=headers)
soup = BeautifulSoup(res.text, "html.parser")
links = soup.find_all("a", href=True)
for link in links[:num]:
parsed = urlparse(link['href'])
if parsed.path.startswith("/l/"):
qs = parse_qs(parsed.query)
actual_url = unquote(qs.get("uddg", [""])[0])
if actual_url:
results.append(actual_url)
#results.append(link['href'])
return results
def crawl_and_summarize(self, urls, llm_function):
summaries = []
for url in urls:
try:
print(f"[crawling] {url}")
html = requests.get(url, timeout=5).text
text = BeautifulSoup(html, "html.parser").get_text()
clean = ' '.join(text.strip().split()[:1000]) # truncate
summary = llm_function(clean)
summaries.append((url, summary))
except Exception as e:
print(f"[crawl error] {url} -> {e}")
return summaries
def add_to_kb(self, summaries):
for url, content in summaries:
vec = self.embedder.encode([content])
self.index.add(vec)
self.meta.append({"url": url, "summary": content})
self._save_index()
def query_kb(self, text, top_k=3):
if self.index.ntotal == 0:
return [], []
vec = self.embedder.encode([text])
D, I = self.index.search(vec, top_k)
results = [self.meta[i] for i in I[0] if i < len(self.meta)]
return D[0], results
def should_trigger_search(self, score_threshold=0.7, text=""):
if self.index.ntotal == 0:
return True
scores, _ = self.query_kb(text, top_k=1)
if not scores or len(scores) == 0:
return True
return scores[0] > 1.0 or scores[0] < (1 - score_threshold)