ByteHunter
Script I made to effectively search in a lot of files, to locate specific word and it occurrence + location in file. Basically I needed this to search in huge ass documentation, hundred and hundred of files... It will attempt to search a lot of files at once using multithreading, so it can tax your CPU and Disk.
You can find the git with the code here: https://github.com/VladoPortos/ByteHunter
How to use it ?
ByteHunter
This Python script is designed to efficiently find occurrences of a specific term within a designated set of directories. It offers customization options and provides a detailed CSV file as output.
Features
- Concurrent Search: Employs multithreading for optimized search performance across multiple files.
- Customizable: Adjust these parameters for a tailored search experience:
SEARCH_TERM
: Your desired search term. This is not case sesitive.DIRECTORIES_TO_SEARCH
: Target directories for the search.EXCLUDE_DIRECTORIES
: Directories to omit from the search.INCLUDE_FILE_TYPES
: Explicitly limit the search to specific file types.EXCLUDE_FILE_TYPES
: Exclude certain file types from the search.NUM_THREADS
: Number of threads to employ in parallel operations.
- Clear Output: Generates a CSV file (
search_results.csv
) detailing:- Filepaths containing matches.
- Total occurrence count in each file.
- Specific line numbers and positions of the search term.
How to Use
- Install Requirements: You might need to install the
concurrent.futures
module. You can typically do this with the following command:pip install concurrent.futures
, how ever this is usually included already. - Modify Configuration: Adapt the variables within the 'Configuration' section of the script to match your search preferences.
- Run the Script: Execute the Python script from your terminal (e.g.,
python main.py
).
Example
Suppose you want to find all instances of "API" within .py
, .md
, and .txt
files in your project directory, excluding a "docs" subdirectory:
SEARCH_TERM = "API"
DIRECTORIES_TO_SEARCH = ["./project_directory"]
EXCLUDE_DIRECTORIES = set(["./project_directory/docs"])
INCLUDE_FILE_TYPES = [".py", ".md", ".txt"]
Notes
- The script filters out files that appear to be binary or non-text to enhance search efficiency.
- Ensure you have permission to read files in the given directories.
Code:
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from pathlib import Path
import csv
import re
import time
from typing import List, Tuple
import os
# Configuration
SEARCH_TERM = "whatever"
# Use Path objects for directories
DIRECTORIES_TO_SEARCH = [Path("./")]
# EXCLUDE_DIRECTORIES = {Path("path/to/exclude/dir1"), Path("path/to/exclude/dir2")}
EXCLUDE_DIRECTORIES = set() # No directories to exclude by default
# INCLUDE_FILE_TYPES = [".txt", ".py", ".md", ".yml", ".yaml", ".json"]
# EXCLUDE_FILE_TYPES = [".json"]
INCLUDE_FILE_TYPES = [] # Include all file types by default
# EXCLUDE_FILE_TYPES = [".json"] # Exclude JSON files
EXCLUDE_FILE_TYPES = [] # No file types to exclude by default
OUTPUT_CSV = "search_results.csv"
# Dynamically set based on system, with a fallback
NUM_THREADS = os.cpu_count() or 10
# Thread-safe dictionary for accumulating results
results_dict = {}
lock = threading.Lock()
def is_file_searchable(file_path: Path) -> bool:
"""
Determine if a file should be included in the search based on its path.
Args:
file_path (Path): The path of the file to check.
Returns:
bool: True if the file meets the criteria for searching, False otherwise.
"""
if any(file_path.is_relative_to(excl_dir) for excl_dir in EXCLUDE_DIRECTORIES):
return False
if INCLUDE_FILE_TYPES and file_path.suffix not in INCLUDE_FILE_TYPES:
return False
if file_path.suffix in EXCLUDE_FILE_TYPES:
return False
try:
with file_path.open('r', encoding='utf-8') as file:
file.read(1024)
return True
except UnicodeDecodeError:
return False
except Exception as e:
print(f"Warning: Unable to read {file_path} due to {e}")
return False
def search_file(file_path: Path) -> None:
"""
Search for occurrences of the SEARCH_TERM in a given file and store the results.
Args:
file_path (Path): The path of the file to be searched.
"""
local_results: List[Tuple[int, int, int]] = []
try:
with file_path.open('r', encoding='utf-8') as file:
line_number = 0
for line in file:
line_number += 1
occurrences = [match.start() for match in re.finditer(
SEARCH_TERM, line, re.IGNORECASE)]
if occurrences:
for pos in occurrences:
local_results.append(
(line_number, pos, pos + len(SEARCH_TERM)))
except Exception as e:
print(f"Error reading file {file_path}: {e}")
with lock:
if local_results:
results_dict[str(file_path)] = local_results
def write_to_csv() -> None:
"""
Writes the search results stored in results_dict to a CSV file.
"""
with open(OUTPUT_CSV, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = ['File Path', 'Occurrence Count', 'Positions']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for file_path, positions in results_dict.items():
writer.writerow({
'File Path': file_path,
'Occurrence Count': len(positions),
'Positions': str(positions)
})
def collect_files(root_dir: Path):
"""
Collect all file paths from the given root directory that match the search criteria.
Args:
root_dir (Path): The root directory from which to collect file paths.
Yields:
Path: The file paths collected from the root directory.
"""
for root, dirs, files in os.walk(root_dir, topdown=True):
dirs[:] = [d for d in dirs if Path(root)/d not in EXCLUDE_DIRECTORIES]
for file in files:
yield Path(root)/file
def main():
"""
Main function to orchestrate the file search process, from collecting files to writing results to CSV.
"""
all_files = []
for root_dir in DIRECTORIES_TO_SEARCH:
all_files.extend(collect_files(root_dir))
print(f"Total files collected: {len(all_files)}")
files_to_search = []
with ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
future_to_file = {executor.submit(
is_file_searchable, file_path): file_path for file_path in all_files}
for future in as_completed(future_to_file):
file_path = future_to_file[future]
if future.result():
files_to_search.append(file_path)
print(f"Total searchable files: {len(files_to_search)}")
with ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
executor.map(search_file, files_to_search)
write_to_csv()
print(f"Search completed. Results written to {OUTPUT_CSV}")
if __name__ == "__main__":
start_time = time.time()
main()
end_time = time.time()
execution_time = end_time - start_time
print(f"Script execution time: {execution_time:.2f} seconds")
Was it usefull to you ? Maybe buy me a drink 😃