import logging
import re
from typing import List, Tuple, Set
import warnings
import pandas as pd
from skimmatch import FuzzyMatcherMulti, FuzzyMatcherMultiHi
from .config import QuerexfuzzConfig
from .dates import resolve_date_range
logger = logging.getLogger(__name__)
logger.info('engine setup')
[docs]
class QuerexfuzzConfigurationWarning(UserWarning):
"""Warning raised when Querexfuzz configuration is unusual or suboptimal."""
pass
# helpers
def highlight(txt: str, indices: List[int]) -> str:
"""Highlight txt at specified indices with HTML <mark> tags."""
if not indices:
return txt
highlight_set: Set[int] = set(indices)
raw_html = "".join(
f"<mark>{char}</mark>" if i in highlight_set else char
for i, char in enumerate(txt)
)
return raw_html.replace('</mark><mark>', '')
def decorate(
df: pd.DataFrame,
idx: List[int],
scores: List[int],
highlights: List[List[int]],
col: str,
score_col: str
) -> pd.DataFrame:
"""
Filters a DataFrame based on search results and adds score and highlight columns.
Args:
df: The original DataFrame.
idx: A list of integer indices for the matched rows.
scores: A list of scores for each match.
highlights: A list of lists, containing character indices to highlight.
col: The name of the column in the DataFrame to apply highlighting to.
Returns:
A new DataFrame containing only the matched rows, with a 'score' column
and an updated column with HTML highlights.
"""
decorated_df = df.iloc[idx].copy()
decorated_df[score_col] = scores
highlighted_col = [
highlight(text, hl_indices)
for text, hl_indices in zip(decorated_df[col], highlights)
]
decorated_df[col] = highlighted_col
return decorated_df
# main class
[docs]
def execute_query(
df: pd.DataFrame,
spec: dict,
config: QuerexfuzzConfig,
engine=None,
) -> pd.DataFrame:
"""Apply the parsed query specification to a DataFrame.
Filter operations (WHERE, regex, date range, sort, head/tail, column select)
all return new DataFrames, so no upfront copy is needed. The only in-place
write is date-column type coercion; a copy is made there, after prior filters
have already reduced the DataFrame.
original_df is kept as a reference to the full unfiltered frame so the fuzzy
matcher (built once on engine._fuzzy_matcher) always searches the complete data
set; pre-filter results are intersected afterwards.
"""
original_df = df
# 1. Filter: WHERE clause (SQL-like)
if spec['where']:
df = df.query(spec['where'])
# 2. Filter: Regex clauses
for field, pattern in spec['regex']:
col = config.bang_field if field == 'BANG' else field
if col and col in df.columns:
try:
df = df.loc[df[col].astype(str).str.contains(
pattern, regex=True, case=False, na=False)]
except re.error:
logger.warning("Regex error with pattern '%s' — ignoring.", pattern)
else:
raise ValueError(f"Invalid column for regex search: '{col}'")
# 3. Filter: Date clauses
for date_filter in spec['dates']:
col = date_filter['field'] or config.default_date_field
if not col or col not in df.columns:
warnings.warn('No valid field for date spec - ignoring.',
QuerexfuzzConfigurationWarning)
else:
if not pd.api.types.is_datetime64_any_dtype(df[col]):
df = df.copy() # copy only when in-place type coercion is needed
df[col] = pd.to_datetime(df[col], errors='coerce')
start_date, end_date = resolve_date_range(date_filter)
if df[col].dt.tz is None:
start_date = start_date.replace(tzinfo=None)
end_date = end_date.replace(tzinfo=None)
df = df.loc[df[col].between(start_date, end_date)]
# 4. Fuzzy Search
has_fuzzy_results = False
if spec['fuzzy']:
fuzzy_conf = config.fuzzy
df_id = id(original_df)
is_mutable = engine is not None and df_id in engine._mutable_ids
# Fast path: immutable df with warm cache — skip all data prep.
cached = (engine._fuzzy_cache.get(df_id)
if engine is not None and not is_mutable else None)
if cached is not None:
search_cols, matcher = cached
logger.debug('reusing cached fuzzy matcher')
else:
if fuzzy_conf.fields == 'all':
search_cols = original_df.select_dtypes(include='object').columns.tolist()
elif fuzzy_conf.fields:
search_cols = [c for c in fuzzy_conf.fields if c in original_df.columns]
else:
search_cols = []
if not search_cols:
warnings.warn('No valid field for fuzzy search - ignoring pattern.',
QuerexfuzzConfigurationWarning)
matcher = None
else:
if len(search_cols) == 1:
search_list = original_df[search_cols[0]].astype(str).to_list()
else:
search_list = original_df[search_cols].apply(
lambda row: ' '.join(row.astype(str)), axis=1).to_list()
logger.info('building fuzzy matcher...')
matcher = (FuzzyMatcherMultiHi if fuzzy_conf.highlight
else FuzzyMatcherMulti)(search_list)
if engine is not None and not is_mutable:
engine._fuzzy_cache[df_id] = (search_cols, matcher)
if matcher is not None:
limit = spec['top'] if spec['top'] > 0 else fuzzy_conf.limit
# When pre-filters narrowed df, overfetch then intersect with valid positions.
has_prefilters = bool(spec['where']) or bool(spec['regex']) or bool(spec['dates'])
if has_prefilters:
fetch_limit = limit * 5
valid_pos = set(i for i in original_df.index.get_indexer(df.index) if i >= 0)
else:
fetch_limit = limit
valid_pos = None
if fuzzy_conf.highlight:
indices, scores, highlights = matcher.query(spec['fuzzy'], fetch_limit)
if valid_pos is not None:
triples = [(i, s, h) for i, s, h in zip(indices, scores, highlights)
if i in valid_pos][:limit]
indices = [t[0] for t in triples]
scores = [t[1] for t in triples]
highlights = [t[2] for t in triples]
if len(search_cols) == 1:
df = decorate(original_df, indices, scores, highlights,
search_cols[0], fuzzy_conf.score_col_name)
else:
df = original_df.iloc[indices].copy()
df[fuzzy_conf.score_col_name] = scores
else:
indices, scores = matcher.query(spec['fuzzy'], fetch_limit)
if valid_pos is not None:
pairs = [(i, s) for i, s in zip(indices, scores) if i in valid_pos][:limit]
indices = [p[0] for p in pairs]
scores = [p[1] for p in pairs]
df = original_df.iloc[indices].copy()
df[fuzzy_conf.score_col_name] = scores
logger.info('fuzzy search complete')
has_fuzzy_results = True
logger.debug('Applied fuzzy matching')
# 5. Sort
sort_cols = [col for col, asc in spec['sort']]
sort_order = [asc for col, asc in spec['sort']]
if sort_cols:
df = df.sort_values(by=sort_cols, ascending=sort_order)
elif has_fuzzy_results:
df = df.sort_values(by=config.fuzzy.score_col_name, ascending=False)
elif 'recent' in spec['flags']:
if config.recent_field:
df = df.sort_values(by=config.recent_field, ascending=False)
else:
warnings.warn('No valid recent field - recent sort ignored.',
QuerexfuzzConfigurationWarning)
# 6. Limit (Top N)
if spec['top'] > 0:
df = df.head(spec['top'])
elif spec['top'] < 0:
df = df.tail(-spec['top'])
# 7. Select Columns
# `select *` → base columns (default when no select clause)
# `select **` → all columns
# `select a, b` → named columns
# `select *, a` → base columns plus a
# `select *, -a`→ base columns minus a (- or ! prefix)
# `select **,-a`→ all columns minus a
sel = spec['select']
if sel['include'] or sel['exclude']:
if '__all__' in sel['include']:
fields = list(df.columns)
elif '__base__' in sel['include'] or not sel['include']:
fields = [i for i in config.base_cols if i in df.columns]
else:
fields = []
# de-duplicate while preserving order (i in seen is O(1))
seen: Set[str] = set()
fields = [i for i in fields if not (i in seen or seen.add(i))]
fields = fields + [
i for i in sel['include'] if i in df.columns and not (i in seen or seen.add(i))]
final_fields = [f for f in fields if f not in sel['exclude']]
elif config.base_cols:
final_fields = [f for f in config.base_cols if f in df.columns]
else:
final_fields = list(df.columns)
if has_fuzzy_results and fuzzy_conf.score_col_name not in final_fields:
final_fields.append(fuzzy_conf.score_col_name)
return df[final_fields]