A memory image taken from a live system is the closest thing to a complete picture of what was happening on that machine at a specific moment in time. Everything that was running, communicating, injected, or hiding is captured in those bytes. The challenge is that extracting useful intelligence from a raw memory image manually is slow. Running Volatility plugins one at a time, copying outputs into notes, correlating findings by hand. A thorough manual analysis of a single 16GB image can take three to four hours for an experienced analyst. When you are hunting across multiple endpoints, or when you need to answer the question “is anything malicious in this image” quickly enough to act on the result, manual analysis does not scale.
This post covers building a single script that accepts a memory image path and produces a comprehensive, prioritised threat hunting report without any further analyst input. You drop an image in, you get a report out. The script handles plugin selection, parallel execution, result parsing, anomaly detection, IOC extraction, cross-referencing between plugins, and report generation. It handles both Windows and Linux images, uses Volatility 2 where it provides unique capability and Volatility 3 for everything else, and produces both a shareable HTML report and structured JSON for SIEM ingestion.
Volatility 2 versus Volatility 3: understanding when to use each
The Volatility project has two major versions in active use. Volatility 3 is the current version, written from scratch with a cleaner architecture, better Python 3 support, and no dependency on a manually selected profile. Volatility 2 is the older version, maintained for specific use cases where it still provides capabilities or plugin depth that Volatility 3 does not yet match. Understanding the practical differences between them determines which one to call for each analysis task.
Volatility 3 should be your default for almost everything. It automatically identifies the OS version and kernel structures from the image without requiring a manually matched profile, which eliminates the most common source of beginner errors (wrong profile). It runs significantly faster on large images because of improved memory mapping. The output format is consistent JSON across all plugins, which makes automation straightforward. The symbol table system is more robust than Volatility 2’s profile system for modern Windows versions.
Volatility 2 retains specific advantages that keep it relevant for threat hunters:
# Where Volatility 2 still provides unique value
# 1. Some plugins simply do not exist yet in Volatility 3
# - mftparser: deep MFT parsing from memory
# - shellbags: user navigation history from registry in memory
# - iehistory: Internet Explorer artefacts from process memory
# - shimcache: application compatibility cache from memory
# - prefetchparser: prefetch execution records from memory
# 2. Certain Linux plugins are more mature in Volatility 2
# - linux_psaux: full command line for Linux processes (more reliable)
# - linux_netstat: Linux network connections with more detail
# - linux_check_syscall: syscall table hook detection
# 3. Legacy Windows support (XP, Vista, 2003, 2008)
# Volatility 3 symbol tables may not exist for very old OS versions
# Volatility 2 with the correct profile covers these reliably
# 4. Certain rootkit detection plugins
# - modscan: raw kernel module scanner (sometimes finds what pslist misses)
# - ssdt: SSDT hook detection (useful on older Windows)
# - apihooks: API hook detection in process memory
# Check what is available in each version
vol2 --info | grep "^[A-Z]" | sort # Volatility 2 plugins
vol3 --help 2>&1 | grep "windows\." | sort # Volatility 3 Windows plugins
The automation script uses both. Volatility 3 for the core analysis pipeline where it is faster and more reliable. Volatility 2 for specific supplementary plugins where it provides unique value. The script detects which tools are available and gracefully skips plugins that cannot run rather than failing entirely.
Setting up both versions side by side
#!/bin/bash
## setup_volatility_dual.sh
## Install Volatility 2 and 3 side by side
## Ubuntu 22.04 - run as root or with sudo
set -euo pipefail
echo "[*] Setting up dual Volatility environment"
## ── Volatility 3 ─────────────────────────────────────────────────────────────
python3 -m venv /opt/vol3-env
source /opt/vol3-env/bin/activate
pip install --upgrade pip -q
pip install volatility3 yara-python pefile capstone python-magic requests -q
# Symbol tables for Windows analysis
mkdir -p /opt/vol3-symbols
if [ ! -d /opt/vol3-symbols/windows ]; then
echo "[*] Downloading Windows symbol tables (~500MB)"
wget -q --show-progress \
https://downloads.volatilityfoundation.org/volatility3/symbols/windows.zip \
-O /tmp/windows_syms.zip
unzip -q /tmp/windows_syms.zip -d /opt/vol3-symbols/
rm /tmp/windows_syms.zip
fi
# Link symbols into Volatility 3
SITE3=$(python3 -c "import site; print(site.getsitepackages()[0])")
ln -sf /opt/vol3-symbols/windows $SITE3/volatility3/symbols/windows 2>/dev/null || true
# Linux symbol tables (requires ISF files generated from target kernel)
# These must be generated for each specific kernel version being analysed
# See: https://github.com/volatilityfoundation/dwarf2json
mkdir -p /opt/vol3-symbols/linux
echo "[*] Volatility 3 installed: $(vol)"
deactivate
## ── Volatility 2 ─────────────────────────────────────────────────────────────
# Volatility 2 requires Python 2.7 (use pyenv or system python2)
apt-get install -y -qq python2 python2-dev python-is-python2 2>/dev/null || \
apt-get install -y -qq python2.7 python2.7-dev 2>/dev/null || true
# If python2 is not available, use Docker instead
if command -v python2 &>/dev/null; then
pip2 install --quiet distorm3 pycryptodome 2>/dev/null || true
git clone --depth 1 https://github.com/volatilityfoundation/volatility.git \
/opt/volatility2 2>/dev/null || (cd /opt/volatility2 && git pull)
echo "[+] Volatility 2: available via python2 /opt/volatility2/vol.py"
else
echo "[!] Python 2 not available - using Docker for Volatility 2"
docker pull remnux/volatility 2>/dev/null || true
cat > /usr/local/bin/vol2 << 'VOL2SCRIPT'
#!/bin/bash
docker run --rm -v "$(dirname $(realpath )):/data" \
remnux/volatility "$@"
VOL2SCRIPT
chmod +x /usr/local/bin/vol2
fi
## ── Supporting tools ─────────────────────────────────────────────────────────
source /opt/vol3-env/bin/activate
pip install -q \
jinja2 \
requests \
psycopg2-binary \
colorama \
tqdm \
tabulate
deactivate
# Install yara, foremost, jq from apt
apt-get install -y -qq yara foremost jq
echo ""
echo "[+] Setup complete"
echo " Volatility 3: vol (in /opt/vol3-env/bin/)"
echo " Volatility 2: python2 /opt/volatility2/vol.py (or vol2 Docker wrapper)"
echo " Symbol tables: /opt/vol3-symbols/"
The automation architecture
The script runs in five sequential phases. Each phase feeds its results into the next. Phase 1 identifies the image and OS type. Phase 2 runs a broad plugin sweep to collect all available data. Phase 3 applies anomaly detection logic across the collected data to score and prioritise findings. Phase 4 extracts and enriches IOCs. Phase 5 generates the output reports. The whole process runs in parallel where possible, with non-dependent plugins executing concurrently, which reduces total analysis time significantly compared to sequential execution.
## File: analyse.py
#!/usr/bin/env python3
## /opt/memory-hunter/analyse.py
## THE ONE SCRIPT TO RUN
## Usage: python3 analyse.py /path/to/memory.raw
## Usage: python3 analyse.py /path/to/memory.raw --workers 16 --no-vol2
import sys
import argparse
import logging
import time
import json
from datetime import datetime
from pathlib import Path
# Add scripts directory to path
sys.path.insert(0, str(Path(__file__).parent / 'scripts'))
from phase1_identify import identify_image
from vol_runner import collect_all_plugins
from anomaly_checks import run_all_checks
from phase4_iocs import extract_iocs
from phase5_report import generate_reports
def setup_logging(log_file: str = None) -> None:
handlers = [logging.StreamHandler(sys.stdout)]
if log_file:
handlers.append(logging.FileHandler(log_file))
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=handlers
)
def print_banner():
print("""
╔══════════════════════════════════════════════════════╗
║ Memory Hunter - Automated Analysis ║
║ Volatility 2 + 3 | Windows + Linux | HTML + JSON ║
╚══════════════════════════════════════════════════════╝
""")
def print_summary(report_info: dict, elapsed: float):
risk = report_info['risk_label']
colour = {
'CRITICAL': '3[91m', 'HIGH': '3[93m',
'MEDIUM': '3[33m', 'LOW': '3[92m'
}.get(risk, '')
reset = '3[0m'
print(f"""
{'='*60}
ANALYSIS COMPLETE ({elapsed:.1f} seconds)
{'='*60}
Risk Level: {colour}{risk}{reset}
Risk Score: {report_info['risk_score']}
Findings: {report_info['finding_count']}
HTML Report: {report_info['html']}
JSON Report: {report_info['json']}
{'='*60}
""")
def main():
print_banner()
parser = argparse.ArgumentParser(
description='Automated memory image analysis for threat hunters'
)
parser.add_argument('image', help='Path to memory image file')
parser.add_argument('--workers', type=int, default=8,
help='Parallel plugin workers (default: 8)')
parser.add_argument('--no-vol2', action='store_true',
help='Skip Volatility 2 plugins')
parser.add_argument('--output-dir', default='/opt/memory-hunter/reports',
help='Report output directory')
parser.add_argument('--yara-rules', default='/opt/memory-hunter/yara_rules/combined.yar',
help='Path to compiled Yara rules')
parser.add_argument('--log-file', help='Write log to file')
parser.add_argument('--quiet', action='store_true',
help='Reduce output verbosity')
args = parser.parse_args()
setup_logging(args.log_file)
log = logging.getLogger(__name__)
start_time = time.time()
image_path = str(Path(args.image).absolute())
print(f"[*] Image: {image_path}")
print(f"[*] Workers: {args.workers}")
print()
# ── Phase 1: Identify ────────────────────────────────────────────────────
print("[Phase 1/5] Identifying image...")
meta = identify_image(image_path)
print(f" OS Type: {meta.get('os_type', 'unknown')}")
print(f" OS Version: {meta.get('os_version', 'unknown')}")
print(f" Vol2 Profile: {meta.get('vol2_profile', 'NOT DETECTED - Vol2 plugins will not run')}")
print(f" Size: {meta.get('size_gb', 0):.1f} GB")
print(f" SHA256: {meta.get('sha256', '')[:32]}...")
if not meta.get('vol2_ok'):
print(f" WARNING: Vol2 not available. For older images (Win XP/7/Vista)")
print(f" install Vol2 and run: vol2 -f <image> imageinfo")
print()
if not meta.get('os_type'):
print("[!] Could not identify OS type - check symbol tables")
print(" Windows: ensure /opt/vol3-symbols/windows/ is populated")
print(" Linux: ensure ISF file exists for this kernel version")
sys.exit(1)
# ── Phase 2: Collect ─────────────────────────────────────────────────────
print(f"[Phase 2/5] Running plugin collection ({args.workers} workers)...")
plugin_results = collect_all_plugins(image_path, meta, args.workers)
print(f" Plugins completed: {len(plugin_results)}")
print()
# ── Phase 3: Detect ──────────────────────────────────────────────────────
print("[Phase 3/5] Running anomaly detection...")
findings, risk_score = run_all_checks(plugin_results)
critical = sum(1 for f in findings if f.severity == 'CRITICAL')
high = sum(1 for f in findings if f.severity == 'HIGH')
print(f" Findings: {len(findings)} total ({critical} critical, {high} high)")
if findings:
print(" Top findings:")
for f in findings[:5]:
print(f" [{f.severity}] {f.title}")
print()
# ── Phase 4: IOCs ────────────────────────────────────────────────────────
print("[Phase 4/5] Extracting IOCs...")
suspicious_pids = [f.pid for f in findings if f.pid]
iocs = extract_iocs(plugin_results, image_path, suspicious_pids)
ioc_count = sum(len(v) for v in iocs.values())
print(f" IOCs extracted: {ioc_count}")
if iocs.get('yara_hits'):
print(f" YARA hits: {len(iocs['yara_hits'])}")
for hit in iocs['yara_hits'][:3]:
print(f" -> {hit}")
print()
# ── Phase 5: Report ──────────────────────────────────────────────────────
print("[Phase 5/5] Generating reports...")
report_info = generate_reports(meta, findings, iocs, plugin_results)
elapsed = time.time() - start_time
print_summary(report_info, elapsed)
print(f" summary.txt: {report_info.get('summary', '')}")
print(f" iocs.txt: {report_info.get('iocs', '')}")
# Exit code reflects risk level for CI/CD integration
exit_codes = {'CRITICAL': 3, 'HIGH': 2, 'MEDIUM': 1, 'LOW': 0}
sys.exit(exit_codes.get(report_info['risk_label'], 0))
if __name__ == '__main__':
main()
Phase 1: image identification and OS detection
## File: phase1_identify.py
#!/usr/bin/env python3
"""
Phase 1: Identify memory image OS type and version.
Tries Volatility 3 first, then Volatility 2 imageinfo for profile detection.
Vol2 profile is required for older images (XP/Vista/Win7) where Vol3 symbols
do not fully match, and for enabling Vol2-only plugins.
"""
import subprocess
import json
import logging
import hashlib
import shutil
from pathlib import Path
from typing import Dict
log = logging.getLogger(__name__)
VOL3_CMD = '/opt/vol3-env/bin/vol'
VOL2_CMD = '/opt/volatility2/vol.py'
def sha256_file(path: str) -> str:
h = hashlib.sha256()
with open(path, 'rb') as f:
for chunk in iter(lambda: f.read(65536), b''):
h.update(chunk)
return h.hexdigest()
def run_vol2_imageinfo(image_path: str) -> str:
"""
Run vol2 imageinfo and return the first suggested profile string.
Handles the exact Vol2 output format:
INFO : volatility.debug : Suggested Profile(s) : Win7SP1x64, Win7SP0x64
Output goes to STDERR in Vol2 - we capture both stdout and stderr.
Returns empty string if profile cannot be determined.
"""
# Find vol2 - check wrapper script first, then direct python2 call
vol2_bin = shutil.which('vol2')
if not vol2_bin and Path(VOL2_CMD).exists():
py2 = shutil.which('python2') or shutil.which('python2.7')
if not py2:
log.warning("vol2 not found and python2/python2.7 not in PATH")
return ''
if not vol2_bin and not (shutil.which('python2') or shutil.which('python2.7')):
log.warning("vol2 not available")
return ''
cmd = ['vol2', '-f', image_path, 'imageinfo'] if vol2_bin else [
shutil.which('python2') or shutil.which('python2.7'),
VOL2_CMD, '-f', image_path, 'imageinfo'
]
log.info(f"Running: {' '.join(cmd[:4])} imageinfo")
try:
r = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=300
)
# Vol2 writes the Suggested Profile line to STDERR
# stdout may contain it too depending on version - check both
combined = (r.stderr or '') + (r.stdout or '')
for line in combined.splitlines():
# Match: " Suggested Profile(s) : Win7SP1x64, Win7SP0x64..."
# or: "INFO : volatility.debug : Suggested Profile(s) : Win7SP1x64"
if 'Suggested Profile' in line and ':' in line:
# rsplit on last colon to get the profile list
profile_str = line.rsplit(':', 1)[-1].strip()
# Take first profile from comma-separated list
profile = profile_str.split(',')[0].strip()
if profile and len(profile) > 3:
log.info(f"Vol2 profile detected: {profile}")
return profile
log.warning("vol2 imageinfo ran but no Suggested Profile line found")
log.debug(f"Vol2 stderr (first 400 chars): {r.stderr[:400]}")
log.debug(f"Vol2 stdout (first 200 chars): {r.stdout[:200]}")
return ''
except subprocess.TimeoutExpired:
log.warning("vol2 imageinfo timed out after 300 seconds")
return ''
except FileNotFoundError:
log.warning("vol2 command not found")
return ''
except Exception as e:
log.warning(f"vol2 imageinfo error: {e}")
return ''
def identify_image(image_path: str) -> Dict:
"""
Identify OS type, version, and architecture from a memory image.
Returns metadata dict used by all subsequent pipeline phases.
"""
path = Path(image_path)
if not path.exists():
raise FileNotFoundError(f"Image not found: {image_path}")
size_gb = path.stat().st_size / (1024 ** 3)
log.info(f"Image: {path.name} ({size_gb:.1f} GB)")
result = {
'path': str(path.absolute()),
'filename': path.name,
'size_gb': round(size_gb, 2),
'sha256': sha256_file(image_path),
'os_type': None,
'os_version': None,
'arch': None,
'vol3_ok': False,
'vol2_ok': False,
'vol3_info': {},
'vol2_profile': None,
'errors': [],
}
# ── Volatility 3: attempt OS identification ───────────────────────────────
log.info("Running windows.info to identify OS")
cmd = [VOL3_CMD, '-f', image_path, '--renderer', 'json', 'windows.info']
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if r.returncode == 0 and r.stdout.strip():
data = json.loads(r.stdout)
rows = data.get('rows', data) if isinstance(data, dict) else data
if rows:
info = {}
for row in rows:
if isinstance(row, list) and len(row) >= 2:
info[row[0]] = row[1]
elif isinstance(row, dict):
info.update(row)
if info:
result['os_type'] = 'windows'
result['vol3_ok'] = True
result['vol3_info'] = info
result['os_version'] = info.get(
'NtBuildLab',
info.get('Kernel Version', 'Unknown')
)
result['arch'] = '64-bit' if '64' in str(
info.get('Kernel Base', '')
) else '32/64-bit'
log.info(f"Vol3 identified: Windows {result['os_version']}")
except subprocess.TimeoutExpired:
result['errors'].append("windows.info timed out")
except json.JSONDecodeError as e:
result['errors'].append(f"windows.info JSON parse error: {e}")
except Exception as e:
result['errors'].append(f"windows.info error: {e}")
# ── Try Linux identification if Windows failed ────────────────────────────
if not result['os_type']:
try:
cmd_linux = [VOL3_CMD, '-f', image_path, '--renderer', 'json', 'linux.bash']
r = subprocess.run(cmd_linux, capture_output=True, text=True, timeout=60)
if r.returncode == 0 and r.stdout.strip():
result['os_type'] = 'linux'
result['vol3_ok'] = True
log.info("Identified: Linux image")
except Exception:
pass
# ── Volatility 2: always run imageinfo to get profile ────────────────────
# Required for:
# - Older images (XP/Vista/Win7) where Vol3 symbols partially match
# - Vol2-only plugins (shimcache, mftparser, prefetchparser etc.)
# - Stable sequential plugin execution on uncertain images
profile = run_vol2_imageinfo(image_path)
if profile:
result['vol2_ok'] = True
result['vol2_profile'] = profile
# If Vol3 did not identify the OS, use the Vol2 profile to set it
if not result['os_type']:
result['os_type'] = 'linux' if 'Linux' in profile else 'windows'
result['os_version'] = profile
log.info(f"OS identified via Vol2: {profile}")
else:
log.warning("Vol2 profile not detected - Vol2 plugins will be skipped")
log.warning("Ensure vol2 is installed: which vol2 && vol2 --info")
if not result['os_type']:
result['errors'].append("Could not identify OS type from image")
log.error("Image identification failed - check symbol tables and vol2 installation")
return result
Phase 2: parallel plugin collection
## File: vol_runner.py
#!/usr/bin/env python3
"""
vol_runner.py - Volatility plugin execution abstraction layer.
Handles Vol2/Vol3 differences, parallel execution with race condition
prevention, and result normalisation.
"""
import subprocess
import json
import logging
import concurrent.futures
import shutil
from typing import List, Dict, Optional, Tuple
from pathlib import Path
log = logging.getLogger(__name__)
VOL3 = '/opt/vol3-env/bin/vol'
VOL2 = '/opt/volatility2/vol.py'
# (vol3_plugin, vol2_plugin, timeout_seconds, description)
WINDOWS_PLUGINS = [
('windows.pslist', 'pslist', 120, 'Process list'),
('windows.pstree', 'pstree', 120, 'Process tree'),
('windows.cmdline', 'cmdline', 180, 'Command line arguments'),
('windows.netscan', 'netscan', 180, 'Network connections'),
('windows.netstat', None, 120, 'Network statistics'),
('windows.malfind', 'malfind', 900, 'Injection detection'),
('windows.vadinfo', 'vadinfo', 300, 'VAD region analysis'),
('windows.dlllist', 'dlllist', 300, 'Loaded DLL list'),
('windows.svcscan', 'svcscan', 180, 'Windows services'),
('windows.scheduled_tasks', None, 120, 'Scheduled tasks'),
('windows.registry.hivelist','hivelist', 120, 'Registry hives'),
('windows.registry.printkey',None, 120, 'Registry run keys'),
('windows.callbacks', None, 180, 'Kernel callbacks'),
('windows.modules', 'modules', 120, 'Loaded kernel modules'),
('windows.driverscan', 'driverscan', 180, 'Driver scan'),
('windows.ssdt', 'ssdt', 120, 'SSDT entries'),
('windows.handles', 'handles', 900, 'Open handles'),
('windows.dumpfiles', None, 300, 'Mapped files'),
('windows.mftscan', None, 300, 'MFT entries'),
('windows.envars', 'envars', 120, 'Environment variables'),
('windows.lsadump', None, 120, 'LSA credentials'),
('windows.hashdump', 'hashdump', 120, 'Password hashes'),
]
LINUX_PLUGINS = [
('linux.pslist', 'linux_pslist', 120, 'Process list'),
('linux.pstree', 'linux_pstree', 120, 'Process tree'),
('linux.bash', 'linux_bash', 120, 'Bash history'),
('linux.netstat', 'linux_netstat', 120, 'Network connections'),
('linux.malfind', None, 600, 'Injection detection'),
('linux.lsmod', 'linux_lsmod', 120, 'Loaded kernel modules'),
('linux.check_syscall', 'linux_check_syscall', 180, 'Syscall table integrity'),
('linux.check_modules', 'linux_check_modules', 120, 'Module integrity'),
('linux.envars', None, 120, 'Environment variables'),
]
# Vol2-only plugins that provide unique value not in Vol3
VOL2_ONLY_WINDOWS = [
('mftparser', 240, 'MFT parser (Vol2)'),
('shimcache', 120, 'Shimcache (Vol2)'),
('prefetchparser', 120, 'Prefetch (Vol2)'),
('iehistory', 120, 'IE history (Vol2)'),
]
VOL2_ONLY_LINUX = [
('linux_psaux', 120, 'Full process args (Vol2)'),
('linux_check_afinfo', 120, 'Network hook detection (Vol2)'),
]
def run_vol3(image_path: str, plugin: str,
timeout: int = 300, extra_args: List[str] = None) -> Tuple[str, List]:
"""Run a Volatility 3 plugin, return (plugin_name, results_list)."""
cmd = [VOL3, '-f', image_path, '--renderer', 'json', plugin]
if extra_args:
cmd.extend(extra_args)
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
if r.returncode != 0 or not r.stdout.strip():
return plugin, []
data = json.loads(r.stdout)
if isinstance(data, dict):
return plugin, data.get('rows', [])
return plugin, data if isinstance(data, list) else []
except subprocess.TimeoutExpired:
log.warning(f"TIMEOUT: {plugin} ({timeout}s)")
return plugin, []
except Exception as e:
log.debug(f"Vol3 error in {plugin}: {e}")
return plugin, []
def run_vol2(image_path: str, plugin: str, profile: str,
timeout: int = 300, extra_args: List[str] = None) -> Tuple[str, str]:
"""Run a Volatility 2 plugin, return (plugin_name, raw_text_output)."""
# Find vol2 binary
vol2_bin = shutil.which('vol2')
if not vol2_bin:
py2 = shutil.which('python2') or shutil.which('python2.7')
if py2 and Path(VOL2).exists():
vol2_bin = py2
cmd = [py2, VOL2, '-f', image_path, f'--profile={profile}', plugin]
else:
return plugin, ''
else:
cmd = [vol2_bin, '-f', image_path, f'--profile={profile}', plugin]
if extra_args:
cmd.extend(extra_args)
try:
r = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=timeout
)
# Vol2 outputs to stdout for data, stderr for info messages
return plugin, r.stdout
except subprocess.TimeoutExpired:
log.warning(f"TIMEOUT (vol2): {plugin} ({timeout}s)")
return plugin, ''
except Exception as e:
log.debug(f"Vol2 error in {plugin}: {e}")
return plugin, ''
def collect_all_plugins(image_path: str, image_info: Dict,
max_workers: int = 8) -> Dict:
"""
Run all applicable plugins in parallel (or sequential for uncertain images).
Returns dict of plugin_name -> results.
"""
os_type = image_info.get('os_type', 'windows')
profile = image_info.get('vol2_profile') or image_info.get('os_version', '')
vol3_ok = image_info.get('vol3_ok', False)
vol2_ok = image_info.get('vol2_ok', False)
vol3_info = image_info.get('vol3_info', {})
# Vol3 symbols are only reliable when vol3_info is populated
vol3_working = vol3_ok and bool(vol3_info)
plugins = WINDOWS_PLUGINS if os_type == 'windows' else LINUX_PLUGINS
vol2_only = VOL2_ONLY_WINDOWS if os_type == 'windows' else VOL2_ONLY_LINUX
# Use sequential execution (1 worker) when Vol3 symbols are uncertain.
# Parallel execution causes a race condition on partially-identified images
# where threads compete to resolve the same kernel structures, producing
# different results on each run.
if not vol3_working and vol2_ok and profile:
effective_workers = 1
log.info("Vol3 symbols uncertain and Vol2 available - running sequentially via Vol2")
elif not vol3_working:
effective_workers = 1
log.info("Vol3 symbols uncertain - running sequentially to prevent race condition")
else:
effective_workers = max_workers
total = len(plugins) + (len(vol2_only) if vol2_ok else 0)
done = 0
results = {}
log.info(f"Running {total} plugins with {effective_workers} worker(s)")
with concurrent.futures.ThreadPoolExecutor(max_workers=effective_workers) as executor:
futures = {}
for vol3_plugin, vol2_plugin, timeout, desc in plugins:
if not vol3_working and vol2_ok and vol2_plugin and profile:
# Vol3 symbols missing - use Vol2 for all plugins that have it
f = executor.submit(run_vol2, image_path, vol2_plugin, profile, timeout)
futures[f] = (vol2_plugin, desc)
elif vol3_ok and vol3_plugin:
# Vol3 identified the image - use Vol3
f = executor.submit(run_vol3, image_path, vol3_plugin, timeout)
futures[f] = (vol3_plugin, desc)
elif vol2_ok and vol2_plugin and profile:
# Vol3 not available - fall back to Vol2
f = executor.submit(run_vol2, image_path, vol2_plugin, profile, timeout)
futures[f] = (vol2_plugin, desc)
# Vol2-only plugins
if vol2_ok and profile:
for vol2_plugin, timeout, desc in vol2_only:
f = executor.submit(run_vol2, image_path, vol2_plugin, profile, timeout)
futures[f] = (vol2_plugin, desc)
for future in concurrent.futures.as_completed(futures):
plugin_name, desc = futures[future]
try:
name, data = future.result()
results[name] = data
done += 1
if isinstance(data, list):
count = len(data)
else:
count = len([l for l in data.splitlines() if l.strip()])
log.info(f" [{done}/{total}] {desc}: {count} records")
except Exception as e:
log.error(f"Plugin {plugin_name} failed: {e}")
results[plugin_name] = []
done += 1
return results
Phase 3: anomaly detection and scoring
## File: anomaly_checks.py
## /opt/memory-hunter/scripts/anomaly_checks.py
## Applies detection logic across collected plugin results
import re
import logging
from typing import Dict, List, Tuple
log = logging.getLogger(__name__)
# Legitimate system processes and their expected parents on Windows
WINDOWS_PROCESS_RULES = {
'system': {'expected_parents': [''], 'expected_path': ''},
'smss.exe': {'expected_parents': ['System'], 'expected_path': r'windows\system32'},
'csrss.exe': {'expected_parents': ['smss.exe'], 'expected_path': r'windows\system32'},
'wininit.exe': {'expected_parents': ['smss.exe'], 'expected_path': r'windows\system32'},
'winlogon.exe':{'expected_parents': ['smss.exe'], 'expected_path': r'windows\system32'},
'services.exe':{'expected_parents': ['wininit.exe'],'expected_path': r'windows\system32'},
'lsass.exe': {'expected_parents': ['wininit.exe'],'expected_path': r'windows\system32'},
'svchost.exe': {'expected_parents': ['services.exe','msiexec.exe'],
'expected_path': r'windows\system32'},
'explorer.exe':{'expected_parents': ['userinit.exe',''],
'expected_path': r'windows'},
'taskhost.exe':{'expected_parents': ['services.exe'],'expected_path': r'windows\system32'},
'spoolsv.exe': {'expected_parents': ['services.exe'],'expected_path': r'windows\system32'},
}
# Office apps that should never spawn shells
SUSPICIOUS_PARENT_CHILD = {
'winword.exe': ['cmd.exe', 'powershell.exe', 'wscript.exe', 'cscript.exe',
'mshta.exe', 'regsvr32.exe', 'rundll32.exe'],
'excel.exe': ['cmd.exe', 'powershell.exe', 'wscript.exe', 'cscript.exe'],
'powerpnt.exe': ['cmd.exe', 'powershell.exe', 'wscript.exe'],
'outlook.exe': ['cmd.exe', 'powershell.exe', 'wscript.exe'],
'acrord32.exe': ['cmd.exe', 'powershell.exe', 'wscript.exe'],
'wmiprvse.exe': ['cmd.exe', 'powershell.exe'],
}
# Paths that are always suspicious for system binaries
SUSPICIOUS_PATHS = [
r'\temp\', r'\tmp\', r'\appdata\',
r'\public\', r'\downloads\', r'\desktop\',
r'\programdata\', r'\recycle'
]
SUSPICIOUS_PORTS = {4444, 8080, 8443, 1337, 31337, 9001, 6667, 4545}
class Finding:
def __init__(self, severity: str, category: str, title: str,
detail: str, pid: int = None, process: str = None):
self.severity = severity # CRITICAL / HIGH / MEDIUM / LOW
self.category = category
self.title = title
self.detail = detail
self.pid = pid
self.process = process
self.score = {'CRITICAL': 40, 'HIGH': 20, 'MEDIUM': 10, 'LOW': 5}[severity]
def to_dict(self):
return {
'severity': self.severity,
'category': self.category,
'title': self.title,
'detail': self.detail,
'pid': self.pid,
'process': self.process,
'score': self.score,
}
def parse_vol2_pslist(raw_text: str) -> List:
"""Parse Volatility 2 pslist text output into a list of process dicts."""
results = []
for line in raw_text.splitlines():
parts = line.split()
if len(parts) >= 7 and parts[0].isdigit():
try:
results.append({
'PID': int(parts[1]) if len(parts) > 1 else 0,
'PPID': int(parts[2]) if len(parts) > 2 else 0,
'ImageFileName': parts[0] if parts else '',
'Path': '',
'CmdLine': ''
})
except (ValueError, IndexError):
pass
elif len(parts) >= 2 and not line.startswith('Offset'):
# Format: Name PID PPID ...
try:
results.append({
'ImageFileName': parts[0],
'PID': int(parts[1]) if len(parts) > 1 and parts[1].isdigit() else 0,
'PPID': int(parts[2]) if len(parts) > 2 and parts[2].isdigit() else 0,
'Path': '',
'CmdLine': ''
})
except (ValueError, IndexError):
pass
return results
def check_processes(pslist: List, pstree: List, cmdline: List) -> List[Finding]:
"""Detect process anomalies: masquerading, suspicious parents, unusual paths."""
findings = []
# Build lookup dicts
pid_to_name = {}
pid_to_ppid = {}
pid_to_path = {}
pid_to_cmd = {}
# Handle Vol2 text output
if isinstance(pslist, str):
pslist = parse_vol2_pslist(pslist)
for proc in pslist:
if not isinstance(proc, (list, dict)):
continue
if isinstance(proc, list):
# Vol3 returns lists: [PID, PPID, ImageFileName, Offset, Threads, Handles, SessionId, Wow64, CreateTime, ExitTime, File output]
pid = proc[0] if len(proc) > 0 else 0
ppid = proc[1] if len(proc) > 1 else 0
name = (proc[2] if len(proc) > 2 else '').lower()
path = (proc[10] if len(proc) > 10 else '').lower()
else:
pid = proc.get('PID', proc.get('pid', 0))
ppid = proc.get('PPID', proc.get('ppid', 0))
name = proc.get('ImageFileName', proc.get('name', '')).lower()
path = proc.get('Path', proc.get('path', '')).lower()
pid_to_name[pid] = name
pid_to_ppid[pid] = ppid
pid_to_path[pid] = path
for cmd_entry in cmdline:
if isinstance(cmd_entry, list) and len(cmd_entry) >= 3:
pid_to_cmd[cmd_entry[0]] = cmd_entry[2] or ''
elif isinstance(cmd_entry, dict):
pid_to_cmd[cmd_entry.get('PID', 0)] = cmd_entry.get('Args', '')
# Check each process
for pid, name in pid_to_name.items():
path = pid_to_path.get(pid, '')
ppid = pid_to_ppid.get(pid, 0)
parent = pid_to_name.get(ppid, '').lower()
cmd = pid_to_cmd.get(pid, '')
# 1. Process in suspicious location
for sus_path in SUSPICIOUS_PATHS:
if sus_path in path.replace('\', '\\'):
if name in [k for k in WINDOWS_PROCESS_RULES.keys()]:
findings.append(Finding(
'CRITICAL', 'process_masquerade',
f'System process in suspicious location',
f'{name} (PID {pid}) running from: {path}',
pid, name
))
elif name.endswith('.exe'):
findings.append(Finding(
'HIGH', 'suspicious_path',
f'Executable in staging location',
f'{name} (PID {pid}) at {path}',
pid, name
))
# 2. Suspicious parent-child pairs
for parent_name, bad_children in SUSPICIOUS_PARENT_CHILD.items():
if parent == parent_name and name in bad_children:
findings.append(Finding(
'CRITICAL', 'suspicious_spawn',
f'Office/PDF app spawned shell',
f'{parent} (PPID {ppid}) spawned {name} (PID {pid})\nCmd: {cmd[:200]}',
pid, name
))
# 3. Encoded PowerShell
if name == 'powershell.exe' and cmd:
if any(enc in cmd.lower() for enc in ['-enc', '-encodedcommand', '-e ']):
findings.append(Finding(
'HIGH', 'encoded_powershell',
'PowerShell with encoded command',
f'PID {pid}: {cmd[:300]}',
pid, name
))
if any(sus in cmd.lower() for sus in [
'downloadstring', 'downloadfile', 'webclient',
'invoke-expression', 'iex ', 'frombase64'
]):
findings.append(Finding(
'HIGH', 'ps_download_cradle',
'PowerShell download cradle detected',
f'PID {pid}: {cmd[:300]}',
pid, name
))
# 4. WMI execution chain
if parent == 'wmiprvse.exe' and name in ['cmd.exe', 'powershell.exe',
'wscript.exe', 'cscript.exe']:
findings.append(Finding(
'HIGH', 'wmi_execution',
'WMI spawned command interpreter',
f'WmiPrvSE spawned {name} (PID {pid}): {cmd[:200]}',
pid, name
))
return findings
def check_malfind(malfind_results: List) -> List[Finding]:
"""Score and categorise malfind results."""
findings = []
for region in malfind_results:
if isinstance(region, list):
pid = region[0] if len(region) > 0 else 0
name = region[1] if len(region) > 1 else ''
start = region[3] if len(region) > 3 else 0
prot = region[5] if len(region) > 5 else ''
hexd = str(region[7]) if len(region) > 7 else ''
elif isinstance(region, dict):
pid = region.get('PID', region.get('Pid', 0))
name = region.get('Process', region.get('ImageFileName', ''))
start = region.get('Start', region.get('VadStart', 0))
prot = region.get('Protection', '')
hexd = str(region.get('Hexdump', region.get('Data', '')))
else:
continue
has_pe = hexd.strip().startswith('4d 5a') or hexd.strip().startswith('MZ')
is_rwx = 'EXECUTE_READ_WRITE' in str(prot)
if has_pe and is_rwx:
findings.append(Finding(
'CRITICAL', 'injection',
'PE file in RWX anonymous memory (reflective loading)',
f'PID {pid} ({name}): addr=0x{start:x} protection={prot}',
pid, name
))
elif has_pe:
findings.append(Finding(
'HIGH', 'injection',
'PE header in executable anonymous memory',
f'PID {pid} ({name}): addr=0x{start:x} protection={prot}',
pid, name
))
elif is_rwx:
findings.append(Finding(
'HIGH', 'injection',
'RWX anonymous memory region (shellcode staging)',
f'PID {pid} ({name}): addr=0x{start:x}',
pid, name
))
return findings
def check_network(netscan_results: List) -> List[Finding]:
"""Detect suspicious network activity."""
findings = []
internal = ['10.', '172.16.', '172.17.', '172.18.', '172.19.',
'172.20.', '172.21.', '172.22.', '172.23.', '172.24.',
'172.25.', '172.26.', '172.27.', '172.28.', '172.29.',
'172.30.', '172.31.', '192.168.', '127.', '0.0.0.0']
for conn in netscan_results:
if isinstance(conn, list):
proto = str(conn[0]) if len(conn) > 0 else ''
local = str(conn[1]) if len(conn) > 1 else ''
remote = str(conn[3]) if len(conn) > 3 else ''
rport = int(conn[4]) if len(conn) > 4 else 0
state = str(conn[5]) if len(conn) > 5 else ''
pid = int(conn[6]) if len(conn) > 6 else 0
name = str(conn[7]) if len(conn) > 7 else ''
elif isinstance(conn, dict):
remote = str(conn.get('ForeignAddr', conn.get('RemoteAddr', '')))
rport = int(conn.get('ForeignPort', conn.get('RemotePort', 0)))
state = str(conn.get('State', ''))
pid = int(conn.get('PID', conn.get('Pid', 0)))
name = str(conn.get('Owner', conn.get('Process', '')))
else:
continue
if 'ESTABLISHED' not in state:
continue
remote_ip = remote.split(':')[0] if ':' in remote else remote
is_external = not any(remote_ip.startswith(r) for r in internal)
if is_external:
if rport in SUSPICIOUS_PORTS:
findings.append(Finding(
'HIGH', 'suspicious_network',
f'Connection to external IP on known C2 port',
f'{name} (PID {pid}) -> {remote_ip}:{rport}',
pid, name
))
elif name.lower() in ['svchost.exe', 'lsass.exe', 'csrss.exe',
'winlogon.exe', 'services.exe']:
findings.append(Finding(
'HIGH', 'suspicious_network',
f'System process with external network connection',
f'{name} (PID {pid}) -> {remote_ip}:{rport}',
pid, name
))
return findings
def check_services(svcscan: List) -> List[Finding]:
"""Detect suspicious service configurations."""
findings = []
suspicious_paths = [r'\temp\', r'\tmp\', r'\appdata\',
r'\public\', r'\programdata\']
for svc in svcscan:
if isinstance(svc, list):
name = str(svc[0]) if len(svc) > 0 else ''
binary = str(svc[4]) if len(svc) > 4 else ''
state = str(svc[2]) if len(svc) > 2 else ''
elif isinstance(svc, dict):
name = str(svc.get('ServiceName', svc.get('Name', '')))
binary = str(svc.get('Binary', svc.get('Path', '')))
state = str(svc.get('State', ''))
else:
continue
binary_lower = binary.lower()
for sus in suspicious_paths:
if sus in binary_lower:
findings.append(Finding(
'HIGH', 'suspicious_service',
'Service binary in staging location',
f'Service: {name} | Binary: {binary}',
None, name
))
if 'powershell' in binary_lower or 'cmd.exe /c' in binary_lower:
findings.append(Finding(
'HIGH', 'suspicious_service',
'Service using interpreter as binary',
f'Service: {name} | Binary: {binary}',
None, name
))
return findings
def check_kernel_integrity(callbacks: List, modules: List,
driverscan: List) -> List[Finding]:
"""Detect kernel-level tampering indicators."""
findings = []
# Known legitimate callback registrants
known_callbacks = [
'ntoskrnl.exe', 'nt', 'win32k.sys', 'ndis.sys',
'tcpip.sys', 'fltmgr.sys', 'ci.dll',
]
for cb in callbacks:
if isinstance(cb, list):
callback_type = str(cb[0]) if len(cb) > 0 else ''
module = str(cb[2]) if len(cb) > 2 else ''
elif isinstance(cb, dict):
callback_type = str(cb.get('Type', ''))
module = str(cb.get('Module', ''))
else:
continue
module_lower = module.lower()
is_known = any(known in module_lower for known in known_callbacks)
if not is_known and module:
findings.append(Finding(
'MEDIUM', 'kernel_callback',
f'Unknown kernel callback registration',
f'Type: {callback_type} | Module: {module}',
))
# Check for hidden modules (in driverscan but not modules)
module_bases = set()
for mod in modules:
if isinstance(mod, list) and len(mod) > 1:
module_bases.add(str(mod[1]))
elif isinstance(mod, dict):
module_bases.add(str(mod.get('Base', '')))
for drv in driverscan:
if isinstance(drv, list) and len(drv) > 1:
base = str(drv[1])
name = str(drv[0]) if len(drv) > 0 else ''
elif isinstance(drv, dict):
base = str(drv.get('Offset', ''))
name = str(drv.get('Name', ''))
else:
continue
if base and base not in module_bases:
findings.append(Finding(
'HIGH', 'hidden_driver',
'Driver found by scan not in module list (possible rootkit)',
f'Name: {name} | Base: {base}',
))
return findings
def run_all_checks(plugin_results: Dict) -> Tuple[List[Finding], int]:
"""Run all anomaly checks and return findings with total risk score."""
all_findings = []
all_findings.extend(check_processes(
plugin_results.get('windows.pslist', []),
plugin_results.get('windows.pstree', []),
plugin_results.get('windows.cmdline', []),
))
all_findings.extend(check_malfind(
plugin_results.get('windows.malfind', []) +
plugin_results.get('linux.malfind', [])
))
all_findings.extend(check_network(
plugin_results.get('windows.netscan', []) +
plugin_results.get('linux.netstat', [])
))
all_findings.extend(check_services(
plugin_results.get('windows.svcscan', [])
))
all_findings.extend(check_kernel_integrity(
plugin_results.get('windows.callbacks', []),
plugin_results.get('windows.modules', []),
plugin_results.get('windows.driverscan', []),
))
total_score = sum(f.score for f in all_findings)
all_findings.sort(key=lambda f: f.score, reverse=True)
return all_findings, total_score
Phase 4: IOC extraction and Yara scanning
## File: phase4_iocs.py
## /opt/memory-hunter/scripts/phase4_iocs.py
import re
import subprocess
import json
import logging
from typing import Dict, List, Set
from pathlib import Path
log = logging.getLogger(__name__)
VOL3 = '/opt/vol3-env/bin/vol'
YARA_RULES = '/opt/memory-hunter/yara_rules/combined.yar'
# Patterns for IOC extraction
PATTERNS = {
'ipv4': re.compile(r'\b(?:(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\b'),
'url': re.compile(r'https?://[a-zA-Z0-9._/?=&%+-]{10,300}'),
'domain': re.compile(r'\b(?:[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?\.){1,10}(?:com|net|org|io|co|xyz|top|tk|ru|cn|de|info|biz)\b', re.I),
'email': re.compile(r'\b[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}\b'),
'hash_md5': re.compile(r'\b[a-fA-F0-9]{32}\b'),
'hash_sha256': re.compile(r'\b[a-fA-F0-9]{64}\b'),
'registry_run':re.compile(r'SOFTWARE\(?:Microsoft\Windows\CurrentVersion\Run|Wow6432Node)[^"\'\]+', re.I),
'pipe': re.compile(r'\\\.\pipe\[a-zA-Z0-9_-]{4,}'),
'base64_large':re.compile(r'[A-Za-z0-9+/]{100,}={0,2}'),
}
INTERNAL_IPS = ['10.', '172.16.', '192.168.', '127.', '169.254.']
LEGIT_DOMAINS = ['microsoft.com', 'windows.com', 'windowsupdate.com',
'google.com', 'akamai.com', 'cloudflare.com',
'amazon.com', 'amazonaws.com']
def extract_strings_from_memory(image_path: str,
suspicious_pids: List[int]) -> str:
"""Extract strings from suspicious process memory regions."""
if not suspicious_pids:
return ''
pid_args = ['--pid', ','.join(str(p) for p in suspicious_pids[:10])]
cmd = [VOL3, '-f', image_path, '--renderer', 'json',
'windows.strings'] + pid_args
r = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
return r.stdout if r.returncode == 0 else ''
def run_yara_scan(image_path: str) -> List[Dict]:
"""Run Yara rules against process memory via Volatility."""
if not Path(YARA_RULES).exists():
log.warning("Yara rules not found - skipping Yara scan")
return []
cmd = [VOL3, '-f', image_path, '--renderer', 'json',
'windows.vadyarascan', '--yara-file', YARA_RULES]
r = subprocess.run(cmd, capture_output=True, text=True, timeout=600)
if r.returncode != 0:
return []
try:
data = json.loads(r.stdout)
rows = data.get('rows', data) if isinstance(data, dict) else data
return rows
except:
return []
def extract_iocs(plugin_results: Dict, image_path: str,
suspicious_pids: List[int]) -> Dict:
"""Extract all IOC types from plugin results and process memory strings."""
iocs: Dict[str, Set] = {k: set() for k in PATTERNS}
iocs['yara_hits'] = set()
# Extract from network connections
for conn in plugin_results.get('windows.netscan', []):
remote = ''
if isinstance(conn, list):
remote = str(conn[3]) if len(conn) > 3 else ''
elif isinstance(conn, dict):
remote = str(conn.get('ForeignAddr', ''))
remote_ip = remote.split(':')[0] if ':' in remote else remote
if remote_ip and not any(remote_ip.startswith(r) for r in INTERNAL_IPS):
iocs['ipv4'].add(remote_ip)
# Extract from command lines
for cmd_entry in plugin_results.get('windows.cmdline', []):
cmd_text = ''
if isinstance(cmd_entry, list) and len(cmd_entry) > 2:
cmd_text = str(cmd_entry[2])
elif isinstance(cmd_entry, dict):
cmd_text = str(cmd_entry.get('Args', ''))
for ioc_type, pattern in PATTERNS.items():
for match in pattern.findall(cmd_text):
if ioc_type == 'ipv4' and not any(match.startswith(r) for r in INTERNAL_IPS):
iocs[ioc_type].add(match)
elif ioc_type == 'url' and not any(d in match for d in LEGIT_DOMAINS):
iocs[ioc_type].add(match[:200])
elif ioc_type not in ('ipv4', 'url'):
iocs[ioc_type].add(match[:200])
# Extract from process memory strings
if suspicious_pids:
strings_output = extract_strings_from_memory(image_path, suspicious_pids)
for ioc_type, pattern in PATTERNS.items():
for match in pattern.findall(strings_output):
if ioc_type == 'url' and not any(d in match for d in LEGIT_DOMAINS):
iocs[ioc_type].add(match[:200])
elif ioc_type == 'ipv4' and not any(match.startswith(r) for r in INTERNAL_IPS):
iocs[ioc_type].add(match)
elif ioc_type == 'pipe' and 'pipe\' in match.lower():
iocs[ioc_type].add(match)
# Yara scan
yara_hits = run_yara_scan(image_path)
for hit in yara_hits:
if isinstance(hit, list):
rule = str(hit[2]) if len(hit) > 2 else ''
pid = str(hit[0]) if len(hit) > 0 else ''
proc = str(hit[1]) if len(hit) > 1 else ''
elif isinstance(hit, dict):
rule = str(hit.get('Rule', ''))
pid = str(hit.get('PID', ''))
proc = str(hit.get('Process', ''))
else:
continue
if rule:
iocs['yara_hits'].add(f"{rule} (PID {pid} / {proc})")
log.warning(f"YARA HIT: {rule} in PID {pid} ({proc})")
return {k: sorted(list(v)) for k, v in iocs.items()}
Phase 5: report generation (HTML and JSON)
## File: phase5_report.py
#!/usr/bin/env python3
"""
Phase 5: Generate analyst-ready reports in a clean folder structure.
Output per run:
reports/<image>_<timestamp>/
report.html - full visual report with navigation, all data shown
report.json - structured data for SIEM/automation
summary.txt - plain text summary for terminal/tickets
iocs.txt - extracted IOCs ready for blocking
dumps/ - any carved/dumped files from memory
"""
import json
import logging
import shutil
from datetime import datetime
from pathlib import Path
from typing import Dict, List
from jinja2 import Environment
log = logging.getLogger(__name__)
REPORT_BASE = Path('/opt/memory-hunter/reports')
# ── HTML template ─────────────────────────────────────────────────────────────
HTML_TEMPLATE = """<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width,initial-scale=1">
<title>Memory Analysis: {{ meta.filename }}</title>
<style>
*{box-sizing:border-box;margin:0;padding:0}
:root{
--bg:#0f172a;--surface:#1e293b;--surface2:#263248;--border:#334155;
--text:#e2e8f0;--text2:#94a3b8;--text3:#64748b;
--amber:#f59e0b;--red:#ef4444;--orange:#f97316;
--yellow:#eab308;--green:#22c55e;--blue:#3b82f6;
--mono:'JetBrains Mono','Fira Mono',monospace;
}
body{font-family:-apple-system,BlinkMacSystemFont,'Segoe UI',sans-serif;
background:var(--bg);color:var(--text);font-size:13px;line-height:1.5}
/* ── NAV ── */
.topbar{background:#0a1120;border-bottom:1px solid var(--border);
padding:0 24px;position:sticky;top:0;z-index:100;
display:flex;align-items:center;gap:0;height:48px}
.topbar-brand{font-family:var(--mono);font-size:12px;color:var(--amber);
font-weight:700;margin-right:24px;white-space:nowrap}
.nav-links{display:flex;align-items:stretch;gap:0;overflow-x:auto;flex:1}
.nav-link{color:var(--text2);text-decoration:none;font-size:11px;
padding:0 14px;display:flex;align-items:center;
border-bottom:2px solid transparent;white-space:nowrap;
transition:all 0.15s}
.nav-link:hover{color:var(--text);border-bottom-color:var(--amber)}
.nav-link .badge{background:var(--surface);color:var(--text3);
font-family:var(--mono);font-size:9px;
padding:1px 5px;border-radius:3px;margin-left:5px}
.nav-link.critical .badge{background:#450a0a;color:var(--red)}
.nav-link.has-data .badge{background:#1e3a5f;color:#60a5fa}
/* ── HEADER ── */
.header{background:linear-gradient(135deg,#0a1120 0%,#1a1f35 100%);
padding:24px 32px;border-bottom:1px solid var(--border)}
.header-top{display:flex;align-items:flex-start;justify-content:space-between;gap:16px}
.header-title{font-family:var(--mono);font-size:15px;font-weight:700;
color:var(--text)}
.header-meta{font-family:var(--mono);font-size:10px;color:var(--text3);
margin-top:6px;display:flex;flex-wrap:wrap;gap:16px}
.header-meta span{color:var(--text2)}
.risk-pill{padding:6px 18px;border-radius:20px;font-weight:800;
font-size:13px;font-family:var(--mono);white-space:nowrap}
.risk-CRITICAL{background:#450a0a;color:var(--red);border:1px solid var(--red)}
.risk-HIGH {background:#431407;color:var(--orange);border:1px solid var(--orange)}
.risk-MEDIUM {background:#422006;color:var(--yellow);border:1px solid var(--yellow)}
.risk-LOW {background:#052e16;color:var(--green);border:1px solid var(--green)}
/* ── STAT CARDS ── */
.stats{display:flex;gap:10px;padding:20px 32px;
background:var(--surface);border-bottom:1px solid var(--border);
overflow-x:auto}
.stat{background:var(--bg);border:1px solid var(--border);border-radius:8px;
padding:12px 18px;text-align:center;min-width:80px}
.stat-val{font-size:22px;font-weight:800;font-family:var(--mono)}
.stat-lbl{font-size:9px;color:var(--text3);text-transform:uppercase;
letter-spacing:.1em;margin-top:3px}
.c-critical{color:var(--red)}.c-high{color:var(--orange)}
.c-medium{color:var(--yellow)}.c-low{color:var(--green)}
.c-blue{color:var(--blue)}.c-amber{color:var(--amber)}
/* ── SECTIONS ── */
.container{padding:0 32px 40px}
.section{margin-top:24px;scroll-margin-top:56px}
.section-header{display:flex;align-items:center;gap:10px;
padding:12px 16px;background:var(--surface);
border:1px solid var(--border);border-radius:8px 8px 0 0;
border-bottom:none}
.section-title{font-family:var(--mono);font-size:12px;font-weight:700;
color:var(--amber)}
.section-count{background:var(--bg);color:var(--text3);
font-family:var(--mono);font-size:9px;
padding:2px 7px;border-radius:3px;border:1px solid var(--border)}
.section-body{background:var(--surface);border:1px solid var(--border);
border-radius:0 0 8px 8px;overflow:hidden}
/* ── FINDINGS ── */
.finding{padding:12px 16px;border-bottom:1px solid var(--border)}
.finding:last-child{border-bottom:none}
.finding-row{display:flex;align-items:flex-start;gap:10px}
.sev{padding:2px 8px;border-radius:4px;font-size:9px;font-weight:700;
font-family:var(--mono);white-space:nowrap;margin-top:1px}
.sev-CRITICAL{background:#450a0a;color:var(--red);border:1px solid var(--red)}
.sev-HIGH {background:#431407;color:var(--orange);border:1px solid var(--orange)}
.sev-MEDIUM {background:#422006;color:var(--yellow);border:1px solid var(--yellow)}
.sev-LOW {background:#052e16;color:var(--green);border:1px solid var(--green)}
.finding-title{font-weight:600;font-size:12px}
.finding-proc{color:var(--text3);font-family:var(--mono);font-size:10px;margin-left:4px}
.finding-detail{font-family:var(--mono);font-size:10px;color:var(--text2);
background:var(--bg);padding:6px 10px;border-radius:4px;
margin-top:6px;white-space:pre-wrap;word-break:break-all;
border-left:2px solid var(--border)}
/* ── TABLES ── */
.tbl{width:100%;border-collapse:collapse}
.tbl th{text-align:left;padding:7px 12px;font-size:9px;color:var(--text3);
text-transform:uppercase;letter-spacing:.08em;
background:var(--surface2);border-bottom:1px solid var(--border);
position:sticky;top:48px}
.tbl td{padding:7px 12px;border-bottom:1px solid #1a2336;
font-family:var(--mono);font-size:10px;vertical-align:top}
.tbl tr:last-child td{border-bottom:none}
.tbl tr:hover td{background:rgba(255,255,255,0.02)}
.tag-ext{display:inline-block;background:#450a0a;color:var(--red);
padding:0 5px;border-radius:3px;font-size:8px;margin-left:4px}
.tag-sus{display:inline-block;background:#422006;color:var(--orange);
padding:0 5px;border-radius:3px;font-size:8px;margin-left:4px}
.tag-ok {display:inline-block;background:#052e16;color:var(--green);
padding:0 5px;border-radius:3px;font-size:8px;margin-left:4px}
.tbl-wrap{max-height:500px;overflow-y:auto}
/* ── IOC GRID ── */
.ioc-grid{display:grid;grid-template-columns:repeat(auto-fill,minmax(280px,1fr));
gap:12px;padding:16px}
.ioc-group{}
.ioc-label{font-size:9px;font-family:var(--mono);text-transform:uppercase;
letter-spacing:.1em;color:var(--text3);margin-bottom:6px}
.ioc-val{font-family:var(--mono);font-size:10px;color:#60a5fa;
background:#0c1e35;padding:3px 8px;border-radius:3px;
margin-bottom:2px;word-break:break-all;
border-left:2px solid #1e3a5f}
.ioc-empty{color:var(--text3);font-size:11px;padding:16px;font-style:italic}
/* ── SCROLLABLE TABLES ── */
.scroll-hint{font-size:9px;color:var(--text3);padding:4px 12px;
background:var(--surface2);text-align:right;
font-family:var(--mono)}
/* ── EMPTY STATE ── */
.empty{padding:24px;text-align:center;color:var(--text3);font-size:11px}
/* ── YARA HITS ── */
.yara-hit{padding:10px 16px;border-bottom:1px solid var(--border);
font-family:var(--mono);font-size:11px;color:var(--red)}
.yara-hit:last-child{border-bottom:none}
@media(max-width:700px){
.stats{flex-wrap:wrap}
.container{padding:0 12px 40px}
.header{padding:16px}
.topbar{padding:0 12px}
}
</style>
</head>
<body>
<!-- ── TOP NAVIGATION ── -->
<div class="topbar">
<div class="topbar-brand">// memory-hunter</div>
<nav class="nav-links">
<a class="nav-link {{ 'critical' if risk_label == 'CRITICAL' else 'has-data' if findings else '' }}"
href="#findings">Findings<span class="badge">{{ findings|length }}</span></a>
<a class="nav-link {{ 'has-data' if iocs.yara_hits else '' }}"
href="#yara">Yara<span class="badge">{{ iocs.yara_hits|length }}</span></a>
<a class="nav-link {{ 'has-data' if iocs.ipv4 or iocs.url else '' }}"
href="#iocs">IOCs<span class="badge">{{ (iocs.ipv4|length) + (iocs.url|length) + (iocs.domain|length) }}</span></a>
<a class="nav-link {{ 'has-data' if network else '' }}"
href="#network">Network<span class="badge">{{ network|length }}</span></a>
<a class="nav-link {{ 'has-data' if processes else '' }}"
href="#processes">Processes<span class="badge">{{ processes|length }}</span></a>
<a class="nav-link {{ 'has-data' if services else '' }}"
href="#services">Services<span class="badge">{{ services|length }}</span></a>
<a class="nav-link {{ 'has-data' if scheduled_tasks else '' }}"
href="#tasks">Sched Tasks<span class="badge">{{ scheduled_tasks|length }}</span></a>
<a class="nav-link {{ 'has-data' if modules else '' }}"
href="#modules">Modules<span class="badge">{{ modules|length }}</span></a>
<a class="nav-link {{ 'has-data' if drivers else '' }}"
href="#drivers">Drivers<span class="badge">{{ drivers|length }}</span></a>
<a class="nav-link {{ 'has-data' if handles else '' }}"
href="#handles">Handles<span class="badge">{{ handles|length }}</span></a>
<a class="nav-link {{ 'has-data' if ie_history else '' }}"
href="#iehistory">IE History<span class="badge">{{ ie_history|length }}</span></a>
<a class="nav-link {{ 'has-data' if prefetch else '' }}"
href="#prefetch">Prefetch<span class="badge">{{ prefetch|length }}</span></a>
<a class="nav-link {{ 'has-data' if shimcache else '' }}"
href="#shimcache">Shimcache<span class="badge">{{ shimcache|length }}</span></a>
<a class="nav-link {{ 'has-data' if mft else '' }}"
href="#mft">MFT<span class="badge">{{ mft|length }}</span></a>
<a class="nav-link {{ 'has-data' if envars else '' }}"
href="#envars">Envars<span class="badge">{{ envars|length }}</span></a>
<a class="nav-link {{ 'has-data' if cmdline else '' }}"
href="#cmdline">CmdLine<span class="badge">{{ cmdline|length }}</span></a>
</nav>
</div>
<!-- ── HEADER ── -->
<div class="header">
<div class="header-top">
<div>
<div class="header-title">{{ meta.filename }}</div>
<div class="header-meta">
<span>SHA256: {{ meta.sha256[:16] }}...{{ meta.sha256[-8:] }}</span>
<span>{{ meta.size_gb }} GB</span>
<span>OS: {{ meta.os_version or 'Unknown' }}</span>
{% if meta.vol2_profile %}<span>Vol2: {{ meta.vol2_profile }}</span>{% endif %}
<span>{{ generated_at }}</span>
</div>
</div>
<div class="risk-pill risk-{{ risk_label }}">{{ risk_label }}</div>
</div>
</div>
<!-- ── STAT CARDS ── -->
<div class="stats">
<div class="stat"><div class="stat-val c-critical">{{ summary.critical }}</div><div class="stat-lbl">Critical</div></div>
<div class="stat"><div class="stat-val c-high">{{ summary.high }}</div><div class="stat-lbl">High</div></div>
<div class="stat"><div class="stat-val c-medium">{{ summary.medium }}</div><div class="stat-lbl">Medium</div></div>
<div class="stat"><div class="stat-val c-amber">{{ summary.yara_hits }}</div><div class="stat-lbl">Yara Hits</div></div>
<div class="stat"><div class="stat-val c-blue">{{ network|length }}</div><div class="stat-lbl">Net Conns</div></div>
<div class="stat"><div class="stat-val">{{ processes|length }}</div><div class="stat-lbl">Processes</div></div>
<div class="stat"><div class="stat-val">{{ modules|length }}</div><div class="stat-lbl">Modules</div></div>
<div class="stat"><div class="stat-val">{{ drivers|length }}</div><div class="stat-lbl">Drivers</div></div>
<div class="stat"><div class="stat-val">{{ handles|length }}</div><div class="stat-lbl">Handles</div></div>
<div class="stat"><div class="stat-val c-blue">{{ (iocs.ipv4|length)+(iocs.url|length)+(iocs.domain|length) }}</div><div class="stat-lbl">Net IOCs</div></div>
<div class="stat"><div class="stat-val">{{ risk_score }}</div><div class="stat-lbl">Risk Score</div></div>
</div>
<div class="container">
<!-- ── FINDINGS ── -->
<div class="section" id="findings">
<div class="section-header">
<span class="section-title">// threat findings</span>
<span class="section-count">{{ findings|length }}</span>
</div>
<div class="section-body">
{% if findings %}
{% for f in findings %}
<div class="finding">
<div class="finding-row">
<span class="sev sev-{{ f.severity }}">{{ f.severity }}</span>
<span class="finding-title">{{ f.title }}</span>
{% if f.process %}<span class="finding-proc">{{ f.process }}{% if f.pid %} (PID {{ f.pid }}){% endif %}</span>{% endif %}
</div>
{% if f.detail %}<div class="finding-detail">{{ f.detail }}</div>{% endif %}
</div>
{% endfor %}
{% else %}
<div class="empty">No threat findings detected</div>
{% endif %}
</div>
</div>
<!-- ── YARA ── -->
<div class="section" id="yara">
<div class="section-header">
<span class="section-title">// yara rule matches</span>
<span class="section-count">{{ iocs.yara_hits|length }}</span>
</div>
<div class="section-body">
{% if iocs.yara_hits %}
{% for hit in iocs.yara_hits %}
<div class="yara-hit">{{ hit }}</div>
{% endfor %}
{% else %}
<div class="empty">No Yara matches</div>
{% endif %}
</div>
</div>
<!-- ── IOCs ── -->
<div class="section" id="iocs">
<div class="section-header">
<span class="section-title">// extracted iocs</span>
<span class="section-count">{{ summary.ioc_count }}</span>
</div>
<div class="section-body">
{% set ioc_type_labels = {
'ipv4': 'IP Addresses', 'url': 'URLs', 'domain': 'Domains',
'pipe': 'Named Pipes', 'registry_run': 'Registry Run Keys',
'hash_sha256': 'SHA256 Hashes', 'hash_md5': 'MD5 Hashes',
'email': 'Email Addresses', 'base64_large': 'Base64 Data'
} %}
{% set has_iocs = namespace(v=false) %}
{% for ioc_type, values in iocs.items() %}
{% if values and ioc_type != 'yara_hits' %}{% set has_iocs.v = true %}{% endif %}
{% endfor %}
{% if has_iocs.v %}
<div class="ioc-grid">
{% for ioc_type, values in iocs.items() %}
{% if values and ioc_type != 'yara_hits' %}
<div class="ioc-group">
<div class="ioc-label">{{ ioc_type_labels.get(ioc_type, ioc_type.replace('_',' ')) }} ({{ values|length }})</div>
{% for v in values %}
<div class="ioc-val">{{ v }}</div>
{% endfor %}
</div>
{% endif %}
{% endfor %}
</div>
{% else %}
<div class="ioc-empty">No IOCs extracted</div>
{% endif %}
</div>
</div>
<!-- ── NETWORK CONNECTIONS ── -->
<div class="section" id="network">
<div class="section-header">
<span class="section-title">// network connections</span>
<span class="section-count">{{ network|length }}</span>
</div>
<div class="section-body">
{% if network %}
<div class="scroll-hint">scroll to view all rows</div>
<div class="tbl-wrap">
<table class="tbl">
<tr><th>PID</th><th>Process</th><th>Local</th><th>Remote IP</th><th>Port</th><th>State</th></tr>
{% for c in network %}
{% if c is mapping %}
{% set remote = c.get('ForeignAddr', c.get('Remote', c.get('ForeignAddress', ''))) %}
<tr>
<td>{{ c.get('PID', c.get('Pid', c.get('pid', ''))) }}</td>
<td>{{ c.get('Owner', c.get('ImageFileName', c.get('Process', c.get('name', '')))) }}</td>
<td>{{ c.get('LocalAddr', c.get('Local', '')) }}</td>
<td>{{ remote }}{% if remote and not remote.startswith(('10.','192.168.','172.','127.','0.','*')) %}<span class="tag-ext">EXT</span>{% endif %}</td>
<td>{{ c.get('ForeignPort', c.get('Port', '')) }}</td>
<td>{{ c.get('State', c.get('state', '')) }}</td>
</tr>
{% else %}
<tr>
<td>{{ c[6] if c|length > 6 else '' }}</td>
<td>{{ c[7] if c|length > 7 else '' }}</td>
<td>{{ c[1] if c|length > 1 else '' }}</td>
{% set r = c[3] if c|length > 3 else '' %}
<td>{{ r }}{% if r and not r.startswith(('10.','192.168.','172.','127.','0.','*')) %}<span class="tag-ext">EXT</span>{% endif %}</td>
<td>{{ c[4] if c|length > 4 else '' }}</td>
<td>{{ c[5] if c|length > 5 else '' }}</td>
</tr>
{% endif %}
{% endfor %}
</table>
</div>
{% else %}<div class="empty">No network connections</div>{% endif %}
</div>
</div>
<!-- ── PROCESS LIST ── -->
<div class="section" id="processes">
<div class="section-header">
<span class="section-title">// process list</span>
<span class="section-count">{{ processes|length }}</span>
</div>
<div class="section-body">
{% if processes %}
<div class="tbl-wrap">
<table class="tbl">
<tr><th>PID</th><th>PPID</th><th>Name</th><th>Path / Command</th><th>Created</th></tr>
{% for p in processes %}
{% if p is mapping %}
<tr>
<td>{{ p.get('PID', p.get('Pid','')) }}</td>
<td>{{ p.get('PPID', p.get('PPid','')) }}</td>
<td><b>{{ p.get('ImageFileName', p.get('Name','')) }}</b></td>
<td style="max-width:500px;overflow:hidden;text-overflow:ellipsis">
{{ p.get('Path', p.get('Exe', p.get('CommandLine', ''))) }}</td>
<td>{{ p.get('CreateTime', p.get('create_time','')) }}</td>
</tr>
{% else %}
<tr>
<td>{{ p[0] if p|length > 0 else '' }}</td>
<td>{{ p[1] if p|length > 1 else '' }}</td>
<td><b>{{ p[2] if p|length > 2 else '' }}</b></td>
<td style="max-width:500px">{{ p[10] if p|length > 10 else p[3] if p|length > 3 else '' }}</td>
<td>{{ p[8] if p|length > 8 else '' }}</td>
</tr>
{% endif %}
{% endfor %}
</table>
</div>
{% else %}<div class="empty">No process data</div>{% endif %}
</div>
</div>
<!-- ── SERVICES ── -->
<div class="section" id="services">
<div class="section-header">
<span class="section-title">// windows services</span>
<span class="section-count">{{ services|length }}</span>
</div>
<div class="section-body">
{% if services %}
<div class="tbl-wrap">
<table class="tbl">
<tr><th>Name</th><th>State</th><th>Type</th><th>Binary Path</th></tr>
{% for s in services %}
{% if s is mapping %}
<tr>
<td>{{ s.get('ServiceName', s.get('Name','')) }}</td>
<td>{{ s.get('State','') }}</td>
<td>{{ s.get('Type', s.get('ServiceType','')) }}</td>
<td>{{ s.get('Binary', s.get('Path', s.get('ImagePath',''))) }}</td>
</tr>
{% else %}
<tr>
<td>{{ s[0] if s|length > 0 else '' }}</td>
<td>{{ s[2] if s|length > 2 else '' }}</td>
<td>{{ s[1] if s|length > 1 else '' }}</td>
<td>{{ s[4] if s|length > 4 else '' }}</td>
</tr>
{% endif %}
{% endfor %}
</table>
</div>
{% else %}<div class="empty">No service data</div>{% endif %}
</div>
</div>
<!-- ── SCHEDULED TASKS ── -->
<div class="section" id="tasks">
<div class="section-header">
<span class="section-title">// scheduled tasks</span>
<span class="section-count">{{ scheduled_tasks|length }}</span>
</div>
<div class="section-body">
{% if scheduled_tasks %}
<div class="tbl-wrap">
<table class="tbl">
<tr><th>Name</th><th>Status</th><th>Command</th><th>Arguments</th></tr>
{% for t in scheduled_tasks %}
<tr>
{% if t is mapping %}
<td>{{ t.get('Name', t.get('TaskName', t.get('name',''))) }}</td>
<td>{{ t.get('Status', t.get('Enabled', t.get('state',''))) }}</td>
<td>{{ t.get('Command', t.get('Action', t.get('cmd',''))) }}</td>
<td style="word-break:break-all">{{ t.get('Arguments', t.get('Args', t.get('args',''))) }}</td>
{% else %}
<td colspan="4" style="white-space:pre-wrap;font-size:10px">{{ t }}</td>
<td></td><td></td><td></td>
{% endif %}
</tr>
{% endfor %}
</table>
</div>
{% else %}<div class="empty">No scheduled tasks</div>{% endif %}
</div>
</div>
<!-- ── MODULES ── -->
<div class="section" id="modules">
<div class="section-header">
<span class="section-title">// loaded kernel modules</span>
<span class="section-count">{{ modules|length }}</span>
</div>
<div class="section-body">
{% if modules %}
<div class="tbl-wrap">
<table class="tbl">
<tr><th>Name</th><th>Base</th><th>Size</th><th>Path</th></tr>
{% for m in modules %}
{% if m is mapping %}
<tr>
<td>{{ m.get('Name', m.get('BaseDllName','')) }}</td>
<td style="font-family:var(--mono)">{{ '0x%x' % m.get('Base',0) if m.get('Base') else m.get('Offset','') }}</td>
<td>{{ m.get('Size','') }}</td>
<td>{{ m.get('Path', m.get('FullDllName','')) }}</td>
</tr>
{% else %}
<tr>
<td>{{ m[1] if m|length > 1 else '' }}</td>
<td>{{ m[0] if m|length > 0 else '' }}</td>
<td>{{ m[2] if m|length > 2 else '' }}</td>
<td>{{ m[3] if m|length > 3 else '' }}</td>
</tr>
{% endif %}
{% endfor %}
</table>
</div>
{% else %}<div class="empty">No module data</div>{% endif %}
</div>
</div>
<!-- ── DRIVERS ── -->
<div class="section" id="drivers">
<div class="section-header">
<span class="section-title">// driver scan</span>
<span class="section-count">{{ drivers|length }}</span>
</div>
<div class="section-body">
{% if drivers %}
<div class="tbl-wrap">
<table class="tbl">
<tr><th>Name</th><th>Base/Offset</th><th>Size</th><th>Service Key</th></tr>
{% for d in drivers %}
{% if d is mapping %}
<tr>
<td>{{ d.get('Name', d.get('DriverName','')) }}</td>
<td style="font-family:var(--mono)">{{ d.get('Offset', d.get('Base','')) }}</td>
<td>{{ d.get('Size','') }}</td>
<td>{{ d.get('ServiceKey', d.get('DriverServiceName','')) }}</td>
</tr>
{% else %}
<tr>
<td>{{ d[0] if d|length > 0 else '' }}</td>
<td>{{ d[1] if d|length > 1 else '' }}</td>
<td>{{ d[2] if d|length > 2 else '' }}</td>
<td>{{ d[3] if d|length > 3 else '' }}</td>
</tr>
{% endif %}
{% endfor %}
</table>
</div>
{% else %}<div class="empty">No driver data</div>{% endif %}
</div>
</div>
<!-- ── HANDLES ── -->
<div class="section" id="handles">
<div class="section-header">
<span class="section-title">// open handles</span>
<span class="section-count">{{ handles|length }}{% if handles|length == 200 %} (capped - see JSON for all){% endif %}</span>
</div>
<div class="section-body">
{% if handles %}
<div class="tbl-wrap">
<table class="tbl">
<tr><th>PID</th><th>Process</th><th>Type</th><th>Handle</th><th>Name</th></tr>
{% for h in handles %}
{% if h is mapping %}
<tr>
<td>{{ h.get('PID', h.get('Pid','')) }}</td>
<td>{{ h.get('Process','') }}</td>
<td>{{ h.get('Type','') }}</td>
<td>{{ h.get('HandleValue', h.get('Handle','')) }}</td>
<td>{{ h.get('Name','') }}</td>
</tr>
{% else %}
<tr>
<td>{{ h[0] if h|length > 0 else '' }}</td>
<td>{{ h[1] if h|length > 1 else '' }}</td>
<td>{{ h[2] if h|length > 2 else '' }}</td>
<td>{{ h[3] if h|length > 3 else '' }}</td>
<td>{{ h[4] if h|length > 4 else '' }}</td>
</tr>
{% endif %}
{% endfor %}
</table>
</div>
{% else %}<div class="empty">No handle data</div>{% endif %}
</div>
</div>
<!-- ── IE HISTORY ── -->
<div class="section" id="iehistory">
<div class="section-header">
<span class="section-title">// ie history (vol2)</span>
<span class="section-count">{{ ie_history|length }}</span>
</div>
<div class="section-body">
{% if ie_history %}
<div class="tbl-wrap">
<table class="tbl">
<tr><th>Process</th><th>URL</th><th>Modified</th><th>Accessed</th></tr>
{% for r in ie_history %}
<tr>
{% if r is mapping %}
<td>{{ r.get('Process', r.get('process','')) }}</td>
<td>{{ r.get('URL', r.get('url','')) }}</td>
<td>{{ r.get('Modified','') }}</td>
<td>{{ r.get('Accessed','') }}</td>
{% else %}
<td colspan="4" style="white-space:pre-wrap">{{ r }}</td>
{% endif %}
</tr>
{% endfor %}
</table>
</div>
{% else %}<div class="empty">No IE history (requires Vol2 iehistory plugin)</div>{% endif %}
</div>
</div>
<!-- ── PREFETCH ── -->
<div class="section" id="prefetch">
<div class="section-header">
<span class="section-title">// prefetch (vol2)</span>
<span class="section-count">{{ prefetch|length }}</span>
</div>
<div class="section-body">
{% if prefetch %}
<div class="tbl-wrap">
<table class="tbl">
<tr><th>Application</th><th>Last Run</th><th>Run Count</th><th>Path</th></tr>
{% for r in prefetch %}
<tr>
{% if r is mapping %}
<td>{{ r.get('Application', r.get('Executable','')) }}</td>
<td>{{ r.get('LastRun', r.get('last_run','')) }}</td>
<td>{{ r.get('RunCount', r.get('run_count','')) }}</td>
<td>{{ r.get('Path','') }}</td>
{% else %}
<td colspan="4" style="white-space:pre-wrap;font-size:10px">{{ r }}</td>
{% endif %}
</tr>
{% endfor %}
</table>
</div>
{% else %}<div class="empty">No prefetch data (requires Vol2 prefetchparser plugin)</div>{% endif %}
</div>
</div>
<!-- ── SHIMCACHE ── -->
<div class="section" id="shimcache">
<div class="section-header">
<span class="section-title">// shimcache (vol2)</span>
<span class="section-count">{{ shimcache|length }}</span>
</div>
<div class="section-body">
{% if shimcache %}
<div class="tbl-wrap">
<table class="tbl">
<tr><th>Last Modified</th><th>Last Update</th><th>Path</th></tr>
{% for r in shimcache %}
<tr>
{% if r is mapping %}
<td>{{ r.get('Last Modified','') }}</td>
<td>{{ r.get('Last Update','') }}</td>
<td>{{ r.get('Path','') }}</td>
{% else %}
<td colspan="3" style="white-space:pre-wrap;font-size:10px">{{ r }}</td>
{% endif %}
</tr>
{% endfor %}
</table>
</div>
{% else %}<div class="empty">No shimcache data (requires Vol2 shimcache plugin)</div>{% endif %}
</div>
</div>
<!-- ── MFT ── -->
<div class="section" id="mft">
<div class="section-header">
<span class="section-title">// mft entries</span>
<span class="section-count">{{ mft_count }}</span>
</div>
<div class="section-body">
<div style="padding:16px;font-family:var(--mono);font-size:11px;color:var(--text2)">
{% if mft_count > 0 %}
MFT data ({{ mft_count }} entries) is too large to render in the browser.<br>
Full MFT written to: <span style="color:var(--amber)">mft.txt</span> in this report folder.<br><br>
To search: <span style="color:var(--green)">grep -i "filename.exe" mft.txt</span>
{% else %}
No MFT data collected.
{% endif %}
</div>
</div>
</div>
<!-- ── ENVIRONMENT VARIABLES ── -->
<div class="section" id="envars">
<div class="section-header">
<span class="section-title">// environment variables</span>
<span class="section-count">{{ envars|length }}</span>
</div>
<div class="section-body">
{% if envars %}
<div class="tbl-wrap">
<table class="tbl">
<tr><th>PID</th><th>Process</th><th>Variable</th><th>Value</th></tr>
{% for e in envars %}
{% if e is mapping %}
<tr>
<td>{{ e.get('PID', e.get('Pid','')) }}</td>
<td>{{ e.get('Process', e.get('Name','')) }}</td>
<td>{{ e.get('Variable', e.get('Key','')) }}</td>
<td style="max-width:400px;overflow:hidden;text-overflow:ellipsis">{{ e.get('Value','') }}</td>
</tr>
{% else %}
<tr>
<td>{{ e[0] if e|length > 0 else '' }}</td>
<td>{{ e[1] if e|length > 1 else '' }}</td>
<td>{{ e[2] if e|length > 2 else '' }}</td>
<td>{{ e[3] if e|length > 3 else '' }}</td>
</tr>
{% endif %}
{% endfor %}
</table>
</div>
{% else %}<div class="empty">No environment variable data</div>{% endif %}
</div>
</div>
<!-- ── COMMAND LINES ── -->
<div class="section" id="cmdline">
<div class="section-header">
<span class="section-title">// command line arguments</span>
<span class="section-count">{{ cmdline|length }}</span>
</div>
<div class="section-body">
{% if cmdline %}
<div class="tbl-wrap">
<table class="tbl">
<tr><th>PID</th><th>Process</th><th>Command Line</th></tr>
{% for c in cmdline %}
{% if c is mapping %}
<tr>
<td>{{ c.get('PID', c.get('Pid','')) }}</td>
<td>{{ c.get('Process', c.get('Name','')) }}</td>
<td style="white-space:pre-wrap;word-break:break-all">{{ c.get('Args', c.get('CommandLine', c.get('CmdLine',''))) }}</td>
</tr>
{% else %}
<tr>
<td>{{ c[0] if c|length > 0 else '' }}</td>
<td>{{ c[1] if c|length > 1 else '' }}</td>
<td style="white-space:pre-wrap;word-break:break-all">{{ c[2] if c|length > 2 else '' }}</td>
</tr>
{% endif %}
{% endfor %}
</table>
</div>
{% else %}<div class="empty">No command line data</div>{% endif %}
</div>
</div>
</div><!-- /container -->
</body>
</html>"""
def _row_type(rows):
if not rows:
return "empty"
r = rows[0]
if hasattr(r, 'get'):
return f"dict ({r.__class__.__name__}): keys={list(r.keys())[:4]}"
try:
return f"list (len={len(r)})"
except Exception:
return f"unknown: {r.__class__.__name__}"
def _get(plugin_results: Dict, *keys):
"""Get plugin results by multiple possible key names, return first non-empty."""
for k in keys:
v = plugin_results.get(k)
if v:
return v
return []
def _parse_vol2_text(raw: str, min_cols: int = 2) -> List:
"""
Parse Vol2 text output into a list of dicts using the header row as keys.
Falls back to returning non-empty stripped lines as strings.
"""
if not raw or not raw.strip():
return []
lines = [l for l in raw.splitlines() if l.strip()]
if len(lines) < 2:
return [l.strip() for l in lines if l.strip()]
# Try to use first line as header
header = lines[0]
# Vol2 headers use spaces to separate - detect by checking for all-caps words
import re
cols = re.split(r'\s{2,}', header.strip())
if len(cols) >= min_cols and any(c.isupper() or c[0].isupper() for c in cols if c):
result = []
for line in lines[1:]:
if line.startswith('-') or line.startswith('*'):
continue
parts = re.split(r'\s{2,}', line.strip())
row = {}
for i, col in enumerate(cols):
row[col.strip()] = parts[i].strip() if i < len(parts) else ''
result.append(row)
return result
# Fallback: return lines as strings
return [l.strip() for l in lines if l.strip() and not l.startswith('Volatility')]
def generate_reports(meta: Dict, findings: List, iocs: Dict,
plugin_results: Dict) -> Dict:
"""Generate all report files in a structured output folder."""
risk_score = sum(f.score for f in findings)
risk_label = (
'CRITICAL' if risk_score >= 60 else
'HIGH' if risk_score >= 30 else
'MEDIUM' if risk_score >= 10 else
'LOW'
)
image_stem = Path(meta['filename']).stem
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
report_dir = REPORT_BASE / f"{image_stem}_{timestamp}"
dumps_dir = report_dir / 'dumps'
report_dir.mkdir(parents=True, exist_ok=True)
dumps_dir.mkdir(exist_ok=True)
findings_dicts = [f.to_dict() for f in findings]
# ── Move any existing dump files into dumps/ ──────────────────────────────
# Files written by dumpfiles or carving tools in the working directory
import glob, os
for pattern in ['*.dmp', '*.img', '*.dat', '*.vacb', 'file.*.img', 'file.*.vacb', '*.raw.dir']:
for f in glob.glob(f'/opt/memory-hunter/{pattern}'):
try:
shutil.move(f, dumps_dir / Path(f).name)
log.info(f"Moved dump file: {Path(f).name} -> dumps/")
except Exception:
pass
# ── Resolve all plugin data with fallbacks ────────────────────────────────
network_rows = _get(plugin_results, 'windows.netscan', 'netscan', 'linux.netstat')
process_rows = _get(plugin_results, 'windows.pslist', 'pslist', 'linux.pslist')
services_raw = _get(plugin_results, 'windows.svcscan', 'svcscan')
modules_raw = _get(plugin_results, 'windows.modules', 'modules', 'linux.lsmod')
drivers_raw = _get(plugin_results, 'windows.driverscan', 'driverscan')
handles_raw = _get(plugin_results, 'windows.handles', 'handles')
cmdline_raw = _get(plugin_results, 'windows.cmdline', 'cmdline')
mft_raw = _get(plugin_results, 'windows.mftscan', 'mftparser')
envars_raw = _get(plugin_results, 'windows.envars', 'envars')
tasks_raw = _get(plugin_results, 'windows.scheduled_tasks', 'scheduled_tasks')
# Vol2-only plugins return text - parse to structured rows
ie_history_raw = _get(plugin_results, 'iehistory')
prefetch_raw = _get(plugin_results, 'prefetchparser')
shimcache_raw = _get(plugin_results, 'shimcache')
ie_history = ie_history_raw if isinstance(ie_history_raw, list) else _parse_vol2_text(str(ie_history_raw))
prefetch = prefetch_raw if isinstance(prefetch_raw, list) else _parse_vol2_text(str(prefetch_raw))
shimcache = shimcache_raw if isinstance(shimcache_raw, list) else _parse_vol2_text(str(shimcache_raw))
log.info(f"network:{len(network_rows)} processes:{len(process_rows)} "
f"services:{len(services_raw)} modules:{len(modules_raw)} "
f"drivers:{len(drivers_raw)} handles:{len(handles_raw)}")
log.info(f"ie_history:{len(ie_history)} prefetch:{len(prefetch)} "
f"shimcache:{len(shimcache)} mft:{len(mft_raw)}")
summary_data = {
'critical': sum(1 for f in findings if f.severity == 'CRITICAL'),
'high': sum(1 for f in findings if f.severity == 'HIGH'),
'medium': sum(1 for f in findings if f.severity == 'MEDIUM'),
'yara_hits': len(iocs.get('yara_hits', [])),
'ioc_count': sum(len(v) for v in iocs.values()),
}
# ── report.json ───────────────────────────────────────────────────────────
json_data = {
'meta': meta,
'generated_at': datetime.now().isoformat(),
'risk_label': risk_label,
'risk_score': risk_score,
'findings': findings_dicts,
'iocs': iocs,
'summary': summary_data,
'network': network_rows[:1000],
'processes': process_rows[:1000],
'services': services_raw[:1000],
'modules': modules_raw[:500],
'drivers': drivers_raw[:500],
'scheduled_tasks':tasks_raw[:500],
'ie_history': ie_history[:500],
'prefetch': prefetch[:500],
'shimcache': shimcache[:1000],
'mft': mft_raw[:50], # capped - full data in mft.txt
}
json_path = report_dir / 'report.json'
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(json_data, f, indent=2, default=str)
# ── report.html ───────────────────────────────────────────────────────────
env = Environment(autoescape=False)
tmpl = env.from_string(HTML_TEMPLATE)
# Cap rows passed to HTML to prevent browser freeze
# Large datasets (14k handles, 13M MFT) are in JSON for full access
HTML_ROW_CAPS = {
'network': 500,
'processes': 500,
'services': 500,
'modules': 300,
'drivers': 300,
'handles': 200, # 14k handles would freeze any browser
'cmdline': 500,
'mft': 50, # written to mft.txt - minimal in JSON
'envars': 300,
'scheduled_tasks':300,
'ie_history': 300,
'prefetch': 300,
'shimcache': 500,
}
def cap(data, key):
limit = HTML_ROW_CAPS.get(key, 500)
if isinstance(data, list) and len(data) > limit:
log.info(f"HTML cap: {key} {len(data)} -> {limit} rows (full data in JSON)")
return data[:limit]
return data or []
html = tmpl.render(
meta=meta,
generated_at=datetime.now().strftime('%Y-%m-%d %H:%M:%S UTC'),
risk_label=risk_label,
risk_score=risk_score,
findings=findings_dicts,
iocs=iocs,
summary=summary_data,
network=cap(network_rows, 'network'),
processes=cap(process_rows, 'processes'),
services=cap(services_raw, 'services'),
modules=cap(modules_raw, 'modules'),
drivers=cap(drivers_raw, 'drivers'),
handles=cap(handles_raw, 'handles'),
cmdline=cap(cmdline_raw, 'cmdline'),
mft_count=len(mft_raw) if isinstance(mft_raw, list) else 0,
envars=cap(envars_raw, 'envars'),
scheduled_tasks=cap(tasks_raw, 'scheduled_tasks'),
ie_history=cap(ie_history, 'ie_history'),
prefetch=cap(prefetch, 'prefetch'),
shimcache=cap(shimcache, 'shimcache'),
)
html_path = report_dir / 'report.html'
with open(html_path, 'w', encoding='utf-8') as f:
f.write(html)
# ── summary.txt ───────────────────────────────────────────────────────────
lines = [
"=" * 62,
" MEMORY ANALYSIS SUMMARY",
"=" * 62,
f" Image: {meta['filename']}",
f" SHA256: {meta['sha256'][:32]}...",
f" Size: {meta['size_gb']} GB",
f" OS: {meta.get('os_version','Unknown')}",
f" Vol2 Profile: {meta.get('vol2_profile','Not detected')}",
f" Analysed: {datetime.now().strftime('%Y-%m-%d %H:%M:%S UTC')}",
"-" * 62,
f" RISK: {risk_label} (Score: {risk_score})",
"-" * 62,
f" Findings: {len(findings)} ({summary_data['critical']} critical, {summary_data['high']} high, {summary_data['medium']} medium)",
f" Yara hits: {summary_data['yara_hits']}",
f" IOCs: {summary_data['ioc_count']}",
f" Network: {len(network_rows)} connections",
f" Processes: {len(process_rows)}",
f" Services: {len(services_raw)}",
f" Drivers: {len(drivers_raw)}",
f" IE History: {len(ie_history)}",
f" Prefetch: {len(prefetch)}",
"-" * 62,
]
if findings_dicts:
lines.append(" TOP FINDINGS:")
for fd in findings_dicts[:15]:
lines.append(f" [{fd['severity']:8}] {fd['title']}")
if fd.get('process'):
lines.append(f" Process: {fd['process']} PID:{fd.get('pid','?')}")
for dl in str(fd.get('detail','')).splitlines()[:1]:
if dl.strip():
lines.append(f" {dl[:78]}")
lines.append("-" * 62)
if iocs.get('yara_hits'):
lines.append(" YARA HITS:")
for hit in iocs['yara_hits']:
lines.append(f" -> {hit}")
lines.append("-" * 62)
if iocs.get('ipv4'):
lines.append(" EXTERNAL IPs:")
for ip in iocs['ipv4']:
lines.append(f" -> {ip}")
lines.append("-" * 62)
lines += [
f" report.html {html_path}",
f" report.json {json_path}",
f" iocs.txt {report_dir / 'iocs.txt'}",
f" dumps/ {dumps_dir}",
"=" * 62,
]
summary_path = report_dir / 'summary.txt'
summary_path.write_text('\n'.join(lines) + '\n', encoding='utf-8')
# ── iocs.txt ──────────────────────────────────────────────────────────────
ioc_lines = [
f"IOCs: {meta['filename']}",
f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S UTC')}",
f"Risk: {risk_label}",
"",
]
ioc_labels = {
'ipv4':'IP Addresses', 'url':'URLs', 'domain':'Domains',
'pipe':'Named Pipes', 'registry_run':'Registry Run Keys',
'hash_sha256':'SHA256 Hashes', 'hash_md5':'MD5 Hashes',
'email':'Email Addresses', 'base64_large':'Base64 Data',
'yara_hits':'Yara Matches',
}
for t, vals in iocs.items():
if vals:
ioc_lines += [f"--- {ioc_labels.get(t,t.replace('_',' ').title())} ({len(vals)}) ---"]
ioc_lines += [str(v) for v in vals]
ioc_lines.append("")
iocs_path = report_dir / 'iocs.txt'
iocs_path.write_text('\n'.join(ioc_lines) + '\n', encoding='utf-8')
# Write MFT to its own file - too large for HTML or JSON
mft_path = report_dir / 'mft.txt'
if isinstance(mft_raw, list) and mft_raw:
with open(mft_path, 'w', encoding='utf-8', errors='replace') as f:
f.write(f"MFT entries from {meta['filename']}\n")
f.write(f"Total entries: {len(mft_raw)}\n")
f.write("=" * 80 + "\n")
for row in mft_raw:
if isinstance(row, dict):
f.write("\t".join(str(v) for v in row.values()) + "\n")
else:
f.write(str(row) + "\n")
log.info(f"MFT written to mft.txt ({len(mft_raw)} entries)")
elif isinstance(mft_raw, str) and mft_raw.strip():
mft_path.write_text(mft_raw, encoding='utf-8', errors='replace')
log.info(f"MFT written to mft.txt")
log.info(f"Reports saved to {report_dir}/")
log.info(f" report.html | report.json | summary.txt | iocs.txt | dumps/")
return {
'report_dir': str(report_dir),
'html': str(html_path),
'json': str(json_path),
'summary': str(summary_path),
'iocs': str(iocs_path),
'dumps': str(dumps_dir),
'mft_txt': str(mft_path) if mft_raw else None,
'risk_label': risk_label,
'risk_score': risk_score,
'finding_count': len(findings),
}
The main entry point: analyse.py
#!/usr/bin/env python3
## /opt/memory-hunter/analyse.py
## THE ONE SCRIPT TO RUN
## Usage: python3 analyse.py /path/to/memory.raw
## Usage: python3 analyse.py /path/to/memory.raw --workers 16 --no-vol2
import sys
import argparse
import logging
import time
import json
from datetime import datetime
from pathlib import Path
# Add scripts directory to path
sys.path.insert(0, str(Path(__file__).parent / 'scripts'))
from phase1_identify import identify_image
from vol_runner import collect_all_plugins
from anomaly_checks import run_all_checks
from phase4_iocs import extract_iocs
from phase5_report import generate_reports
def setup_logging(log_file: str = None) -> None:
handlers = [logging.StreamHandler(sys.stdout)]
if log_file:
handlers.append(logging.FileHandler(log_file))
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=handlers
)
def print_banner():
print("""
╔══════════════════════════════════════════════════════╗
║ Memory Hunter - Automated Analysis ║
║ Volatility 2 + 3 | Windows + Linux | HTML + JSON ║
╚══════════════════════════════════════════════════════╝
""")
def print_summary(report_info: dict, elapsed: float):
risk = report_info['risk_label']
colour = {
'CRITICAL': '\033[91m', 'HIGH': '\033[93m',
'MEDIUM': '\033[33m', 'LOW': '\033[92m'
}.get(risk, '')
reset = '\033[0m'
print(f"""
{'='*60}
ANALYSIS COMPLETE ({elapsed:.1f} seconds)
{'='*60}
Risk Level: {colour}{risk}{reset}
Risk Score: {report_info['risk_score']}
Findings: {report_info['finding_count']}
HTML Report: {report_info['html']}
JSON Report: {report_info['json']}
{'='*60}
""")
def main():
print_banner()
parser = argparse.ArgumentParser(
description='Automated memory image analysis for threat hunters'
)
parser.add_argument('image', help='Path to memory image file')
parser.add_argument('--workers', type=int, default=8,
help='Parallel plugin workers (default: 8)')
parser.add_argument('--no-vol2', action='store_true',
help='Skip Volatility 2 plugins')
parser.add_argument('--output-dir', default='/opt/memory-hunter/reports',
help='Report output directory')
parser.add_argument('--yara-rules', default='/opt/memory-hunter/yara_rules/combined.yar',
help='Path to compiled Yara rules')
parser.add_argument('--log-file', help='Write log to file')
parser.add_argument('--quiet', action='store_true',
help='Reduce output verbosity')
args = parser.parse_args()
setup_logging(args.log_file)
log = logging.getLogger(__name__)
start_time = time.time()
image_path = str(Path(args.image).absolute())
print(f"[*] Image: {image_path}")
print(f"[*] Workers: {args.workers}")
print()
# ── Phase 1: Identify ────────────────────────────────────────────────────
print("[Phase 1/5] Identifying image...")
meta = identify_image(image_path)
print(f" OS Type: {meta.get('os_type', 'unknown')}")
print(f" OS Version: {meta.get('os_version', 'unknown')}")
print(f" Size: {meta.get('size_gb', 0):.1f} GB")
print(f" SHA256: {meta.get('sha256', '')[:32]}...")
print()
if not meta.get('os_type'):
print("[!] Could not identify OS type - check symbol tables")
print(" Windows: ensure /opt/vol3-symbols/windows/ is populated")
print(" Linux: ensure ISF file exists for this kernel version")
sys.exit(1)
# ── Phase 2: Collect ─────────────────────────────────────────────────────
print(f"[Phase 2/5] Running plugin collection ({args.workers} workers)...")
plugin_results = collect_all_plugins(image_path, meta, args.workers)
print(f" Plugins completed: {len(plugin_results)}")
print()
# ── Phase 3: Detect ──────────────────────────────────────────────────────
print("[Phase 3/5] Running anomaly detection...")
findings, risk_score = run_all_checks(plugin_results)
critical = sum(1 for f in findings if f.severity == 'CRITICAL')
high = sum(1 for f in findings if f.severity == 'HIGH')
print(f" Findings: {len(findings)} total ({critical} critical, {high} high)")
if findings:
print(" Top findings:")
for f in findings[:5]:
print(f" [{f.severity}] {f.title}")
print()
# ── Phase 4: IOCs ────────────────────────────────────────────────────────
print("[Phase 4/5] Extracting IOCs...")
suspicious_pids = [f.pid for f in findings if f.pid]
iocs = extract_iocs(plugin_results, image_path, suspicious_pids)
ioc_count = sum(len(v) for v in iocs.values())
print(f" IOCs extracted: {ioc_count}")
if iocs.get('yara_hits'):
print(f" YARA hits: {len(iocs['yara_hits'])}")
for hit in iocs['yara_hits'][:3]:
print(f" -> {hit}")
print()
# ── Phase 5: Report ──────────────────────────────────────────────────────
print("[Phase 5/5] Generating reports...")
report_info = generate_reports(meta, findings, iocs, plugin_results)
elapsed = time.time() - start_time
print_summary(report_info, elapsed)
# Exit code reflects risk level for CI/CD integration
exit_codes = {'CRITICAL': 3, 'HIGH': 2, 'MEDIUM': 1, 'LOW': 0}
sys.exit(exit_codes.get(report_info['risk_label'], 0))
if __name__ == '__main__':
main()
Making it truly one command to run
## Install the script system-wide
sudo ln -sf /opt/memory-hunter/analyse.py /usr/local/bin/memory-hunt
sudo chmod +x /opt/memory-hunter/analyse.py
## Now you can run from anywhere:
memory-hunt /path/to/image.raw
## With options:
memory-hunt /path/to/image.raw --workers 16 --log-file /tmp/analysis.log
## In a pipeline (exit code reflects risk):
memory-hunt suspicious.raw && echo "CLEAN" || echo "THREATS FOUND"
## Process multiple images in parallel
ls /srv/memory/landing/*.raw | \
parallel -j 4 memory-hunt {} --log-file /srv/memory/logs/{/.}.log
## Quick check with reduced scope (faster for initial triage)
memory-hunt image.raw --workers 4 --no-vol2
Linux image support and ISF generation
## Linux memory analysis requires ISF (Intermediate Symbol Format) files
## These must be generated for each specific kernel version being analysed
## Method 1: Generate ISF from a running system (same kernel as the image)
## Install dwarf2json
wget https://github.com/volatilityfoundation/dwarf2json/releases/latest/download/dwarf2json-linux-amd64
chmod +x dwarf2json-linux-amd64
## Generate ISF from the running kernel (on the target or identical system)
sudo ./dwarf2json-linux-amd64 linux \
--elf /usr/lib/debug/boot/vmlinux-$(uname -r) \
> /opt/vol3-symbols/linux/$(uname -r).json
## Method 2: Generate from vmlinux debug symbols package
## On Ubuntu/Debian:
sudo apt install linux-image-$(uname -r)-dbgsym 2>/dev/null || \
sudo apt install linux-image-$(uname -r)-dbg
sudo ./dwarf2json-linux-amd64 linux \
--elf /usr/lib/debug/boot/vmlinux-$(uname -r) \
--system-map /boot/System.map-$(uname -r) \
> /opt/vol3-symbols/linux/$(uname -r).json
## Verify Volatility can use the ISF
vol -f linux_memory.lime linux.pslist
## Automating ISF generation for a fleet
## Run this on each unique kernel version in your environment
python3 << 'EOF'
import subprocess, os, sys
from pathlib import Path
ISF_DIR = Path('/opt/vol3-symbols/linux')
ISF_DIR.mkdir(parents=True, exist_ok=True)
kernel_version = subprocess.run(['uname', '-r'],
capture_output=True, text=True).stdout.strip()
isf_path = ISF_DIR / f"{kernel_version}.json"
if isf_path.exists():
print(f"ISF already exists for {kernel_version}")
sys.exit(0)
# Try to find vmlinux debug symbols
vmlinux_paths = [
f'/usr/lib/debug/boot/vmlinux-{kernel_version}',
f'/usr/lib/debug/lib/modules/{kernel_version}/vmlinux',
f'/boot/vmlinux-{kernel_version}',
]
vmlinux = next((p for p in vmlinux_paths if os.path.exists(p)), None)
if not vmlinux:
print(f"No debug symbols found for {kernel_version}")
print(f"Install: apt install linux-image-{kernel_version}-dbgsym")
sys.exit(1)
result = subprocess.run([
'/opt/dwarf2json-linux-amd64', 'linux',
'--elf', vmlinux,
], capture_output=True, timeout=300)
if result.returncode == 0:
isf_path.write_bytes(result.stdout)
print(f"ISF generated: {isf_path} ({len(result.stdout)//1024}KB)")
else:
print(f"dwarf2json failed: {result.stderr}")
sys.exit(1)
EOF
Troubleshooting the automation pipeline
## Common issues and their fixes
## Issue 1: "Unsatisfied requirement" errors from Volatility 3
## This means symbol tables are missing or mislinked
source /opt/vol3-env/bin/activate
python3 -c "
import volatility3.symbols as sym
import os
sym_path = os.path.dirname(sym.__file__)
win_path = os.path.join(sym_path, 'windows')
print(f'Symbol path: {sym_path}')
print(f'Windows syms: {os.path.isdir(win_path)}')
if os.path.isdir(win_path):
files = os.listdir(win_path)
print(f'Files: {len(files)} (sample: {files[:2]})')
"
## Fix: re-link symbols
SITE=$(python3 -c "import site; print(site.getsitepackages()[0])")
ln -sf /opt/vol3-symbols/windows $SITE/volatility3/symbols/windows
## Issue 2: Plugin times out on large images
## Increase timeout values in vol_runner.py or limit to core plugins only
## The --workers flag does not help here - timeouts are per-plugin
## Issue 3: Yara scan returns no results despite known malware in image
## Check compiled ruleset is not corrupted
python3 -c "
import yara
try:
rules = yara.load('/opt/memory-hunter/yara_rules/combined.yar')
print('Ruleset loaded OK')
except Exception as e:
print(f'Error: {e} - recompile the ruleset')
"
## Recompile from source rules:
python3 -c "
import yara, glob
rule_files = {}
for f in glob.glob('/opt/memory-hunter/yara_rules/rules/*.yar'):
rule_files[f.split('/')[-1].replace('.yar','')] = f
combined = yara.compile(filepaths=rule_files)
combined.save('/opt/memory-hunter/yara_rules/combined.yar')
print(f'Compiled {len(rule_files)} rule files')
"
## Issue 4: HTML report renders but shows no data
## The template assumes Vol3 list format [pid, ppid, name, ...path at index 10]
## Different Vol3 versions may change column ordering
## Debug by checking raw plugin output:
vol -f image.raw --renderer json windows.pslist | \
python3 -c "import json,sys; d=json.load(sys.stdin); print(d['columns'])"
## Issue 5: analyse.py runs but risk score is always 0
## Check anomaly_checks.py is receiving data by printing plugin result sizes:
python3 -c "
import sys; sys.path.insert(0,'/opt/memory-hunter/scripts')
from vol_runner import collect_all_plugins
from phase1_identify import identify_image
meta = identify_image(sys.argv[1])
results = collect_all_plugins(sys.argv[1], meta, 4)
for name, data in sorted(results.items()):
count = len(data) if isinstance(data, list) else len(data.splitlines())
print(f'{name}: {count} records')
" /path/to/image.raw
Integrating with the fleet pipeline
The single-image automation script integrates cleanly with the fleet collection pipeline described in the companion post. When the image watcher detects a new image it can call the analyse.py script directly instead of the Celery task chain, which is useful for simpler deployments that do not need the full distributed pipeline.
## Simple integration with the image watcher
## Replace the validate_image.delay() call with a direct script invocation
## In image_watcher.py, replace:
## validate_image.delay(image_id)
## With:
import subprocess
subprocess.Popen([
'/opt/vol3-env/bin/python3',
'/opt/memory-hunter/analyse.py',
str(path),
'--workers', '8',
'--log-file', f'/srv/memory/logs/{hostname}_{timestamp}.log',
'--output-dir', f'/srv/memory/reports/{hostname}',
])
## The exit code from analyse.py maps to risk level:
## 0 = LOW (no concerning findings)
## 1 = MEDIUM
## 2 = HIGH
## 3 = CRITICAL
## Use this in automation to trigger different response actions
## Example: auto-isolate a host if CRITICAL findings
result = subprocess.run([
'/opt/vol3-env/bin/python3',
'/opt/memory-hunter/analyse.py',
image_path,
], capture_output=True)
if result.returncode == 3:
log.warning(f"CRITICAL findings in {hostname} - triggering isolation workflow")
# Call your EDR/firewall API to isolate the host
isolate_host(hostname)
elif result.returncode == 2:
log.warning(f"HIGH findings in {hostname} - notifying SOC")
notify_soc(hostname)
The HTML report template in full
The report template referenced in phase5_report.py is a Jinja2 template that lives at /opt/memory-hunter/templates/report.html.j2. The template code was included inline in the phase5_report.py listing above as the HTML_TEMPLATE string. To use it as a standalone file instead, replace the inline string with a file load:
## In phase5_report.py, replace the HTML_TEMPLATE string with:
from jinja2 import Environment, FileSystemLoader
env = Environment(loader=FileSystemLoader('/opt/memory-hunter/templates'))
tmpl = env.get_template('report.html.j2')
html = tmpl.render(...)
Save the template content from the HTML_TEMPLATE variable in phase5_report.py to /opt/memory-hunter/templates/report.html.j2. The template uses standard Jinja2 syntax throughout: {{ variable }} for output, {% for item in list %} for loops, {% if condition %} for conditionals. No additional template dependencies are needed beyond Jinja2 itself.
Yara rules: complete content for all four rule files
These are the four rule files that belong in /opt/memory-hunter/yara_rules/rules/. Each focuses on a different threat category relevant to Windows endpoint memory analysis.
## File: cobalt_strike.yar
// Cobalt Strike beacon detection rules
rule CobaltStrike_Beacon_Config_Decoded {
meta:
description = "Detects decoded Cobalt Strike beacon configuration in process memory"
author = "justruss"
date = "2026-05-24"
confidence = "high"
strings:
$cfg_header = { 00 01 00 01 00 00 00 ?? 00 02 00 01 }
$uri_check = "/updates/check" ascii wide
$uri_submit = "/submit.php" ascii wide
$uri_cdn = "/CDN/" ascii wide
$sleep_mask = { C7 44 24 ?? 01 00 00 00 EB ?? }
$ref_loader = "ReflectiveLoader" ascii fullword
$pipe_msse = "\\.\pipe\MSSE-" ascii wide
$pipe_postex = "\\.\pipe\postex_" ascii wide
$pipe_status = "\\.\pipe\status_" ascii wide
$watermark = { 00 27 00 01 }
condition:
($ref_loader or $sleep_mask) and
1 of ($uri_check, $uri_submit, $uri_cdn, $cfg_header, $watermark)
or 2 of ($pipe_msse, $pipe_postex, $pipe_status)
}
rule CobaltStrike_Shellcode_Stager {
meta:
description = "Detects Cobalt Strike shellcode stager in memory"
confidence = "medium"
strings:
$stager_x64 = { FC 48 83 E4 F0 E8 C0 00 00 00 }
$stager_x86 = { FC E8 82 00 00 00 60 89 E5 }
condition:
any of them
}
rule CobaltStrike_MalleableC2_Indicators {
meta:
description = "Detects indicators of Cobalt Strike Malleable C2 profiles"
confidence = "low"
strings:
$ref_loader = "ReflectiveLoader" ascii fullword
$sleep_mask = { C7 44 24 ?? 01 00 00 00 EB ?? }
$amz_host = "s3.amazonaws.com" ascii wide
$o365 = "outlook.office365.com" ascii wide
$ua_excel = "Microsoft Excel" ascii wide
$ua_teams = "Teams/1." ascii wide
condition:
($ref_loader or $sleep_mask) and
1 of ($amz_host, $o365, $ua_excel, $ua_teams)
}
## File: meterpreter.yar
// Meterpreter detection rules
rule Meterpreter_Reflective_DLL_x64 {
meta:
description = "Detects Meterpreter x64 reflective DLL loaded in process memory"
author = "justruss"
date = "2026-05-24"
confidence = "high"
strings:
$mz = { 4D 5A }
$ref_loader = "ReflectiveLoader" ascii fullword
$stdapi = "stdapi_" ascii
$priv = "priv_elevate" ascii
$incognito = "incognito_" ascii
$kiwi = "kiwi_cmd" ascii
$transport = "METERPRETER_TRANSPORT_" ascii
$pivot = "pivot_" ascii
$session_chan = "MeterpreterSession" ascii wide nocase
condition:
$mz at 0 and $ref_loader and
2 of ($stdapi, $priv, $incognito, $kiwi, $transport, $pivot, $session_chan)
}
rule Meterpreter_Shellcode_Reverse_TCP {
meta:
description = "Detects Meterpreter reverse TCP shellcode in memory"
confidence = "high"
strings:
$rev_tcp_x64 = { 49 BE ?? ?? ?? ?? ?? ?? ?? ?? 41 FF E6 }
$lib_resolve = { 48 31 C9 48 81 EC D0 00 00 00 }
condition:
any of them
}
rule Meterpreter_Python_Stage {
meta:
description = "Detects Python Meterpreter stage in memory"
confidence = "medium"
strings:
$py_met1 = "met_api" ascii
$py_met2 = "meterpreter.core" ascii
$py_met3 = "MeterpreterSession" ascii
$py_met4 = "from metasploit" ascii nocase
condition:
2 of them
}
## File: credential_tools.yar
// Credential access tool detection rules
rule Mimikatz_In_Memory {
meta:
description = "Detects Mimikatz and variants loaded in process memory"
author = "justruss"
date = "2026-05-24"
confidence = "high"
strings:
$sekurlsa = "sekurlsa::" ascii wide nocase
$lsadump = "lsadump::" ascii wide nocase
$kerberos = "kerberos::" ascii wide nocase
$crypto = "crypto::" ascii wide nocase
$dpapi = "dpapi::" ascii wide nocase
$mimikatz = "mimikatz" ascii wide nocase
$priv_debug = "privilege::debug" ascii wide nocase
$logonpw = "logonPasswords" ascii wide
$wdigest = "wdigest.dll" ascii wide
$lsasrv = "lsasrv.dll" ascii wide
$ntlm_hash = "NTLM hash" ascii wide nocase
$aes256_key = "AES256 HMAC" ascii wide
condition:
2 of ($sekurlsa, $lsadump, $kerberos, $crypto, $dpapi, $mimikatz, $priv_debug)
or ($wdigest and $lsasrv and 1 of ($sekurlsa, $lsadump, $logonpw))
or ($ntlm_hash and $aes256_key and $wdigest)
}
rule Rubeus_Kerberos_Toolkit {
meta:
description = "Detects Rubeus .NET Kerberos attack toolkit in memory"
confidence = "high"
strings:
$rubeus_id = "Rubeus" ascii wide
$asktgt = "asktgt" ascii wide nocase
$kerberoast = "kerberoast" ascii wide nocase
$asreproast = "asreproast" ascii wide nocase
$s4u = " s4u " ascii wide nocase
$ptt = "ptt" ascii wide
$harvest = "harvest" ascii wide nocase
$monitor = "monitor" ascii wide nocase
$dump_cmd = "dump" ascii wide
$dotnet = { 4D 5A 90 00 03 00 00 00 04 00 00 00 FF FF 00 00 }
condition:
$rubeus_id and $dotnet and
2 of ($asktgt, $kerberoast, $asreproast, $s4u, $ptt, $harvest, $monitor, $dump_cmd)
}
rule SharpHound_BloodHound_Collector {
meta:
description = "Detects SharpHound/BloodHound AD enumeration tool in memory"
confidence = "high"
strings:
$sh1 = "SharpHound" ascii wide nocase
$sh2 = "BloodHound" ascii wide nocase
$sh3 = "Invoke-BloodHound" ascii wide nocase
$ldap1 = "GetAllDomainTrusts" ascii wide
$ldap2 = "GetDomainControllers" ascii wide
$ldap3 = "GetDomainComputers" ascii wide
$ldap4 = "LdapSearcher" ascii wide
$zip = "BloodHound.zip" ascii wide
$json1 = "computers.json" ascii wide
$json2 = "users.json" ascii wide
$json3 = "groups.json" ascii wide
condition:
1 of ($sh1, $sh2, $sh3)
or 3 of ($ldap1, $ldap2, $ldap3, $ldap4, $zip, $json1, $json2, $json3)
}
rule NanoDump_LSASS_Dumper {
meta:
description = "Detects NanoDump or similar LSASS dumping tools in memory"
confidence = "high"
strings:
$nano1 = "nanodump" ascii wide nocase
$nano2 = "NanoDump" ascii wide
$syscall_pattern = { 4C 8B D1 B8 ?? 00 00 00 0F 05 C3 }
$minidump = "MiniDumpWriteDump" ascii wide
$lsass_name = "lsass.exe" ascii wide nocase
condition:
$nano1 or $nano2
or ($syscall_pattern and $lsass_name)
or ($minidump and $lsass_name and $syscall_pattern)
}
rule Seatbelt_Recon_Tool {
meta:
description = "Detects Seatbelt post-exploitation recon tool in memory"
confidence = "high"
strings:
$sb1 = "Seatbelt" ascii wide
$sb2 = "WindowsCredentialFiles" ascii wide
$sb3 = "DpapiMasterKeys" ascii wide
$sb4 = "RDPSavedConnections" ascii wide
$sb5 = "NetworkProfiles" ascii wide
$sb6 = "TokenPrivileges" ascii wide
$sb7 = "PowerShellHistory" ascii wide
condition:
$sb1 or 4 of ($sb2, $sb3, $sb4, $sb5, $sb6, $sb7)
}
## File: generic_suspicious.yar
// Generic suspicious pattern detection rules
rule RWX_PE_In_Anonymous_Memory {
meta:
description = "PE file in executable anonymous memory - possible reflective loading"
author = "justruss"
date = "2026-05-24"
confidence = "medium"
strings:
$mz_header = { 4D 5A 90 00 }
$pe_sig = { 50 45 00 00 }
condition:
$mz_header at 0 and $pe_sig
}
rule Shellcode_Common_x64_Preambles {
meta:
description = "Common x64 shellcode entry patterns in executable memory"
confidence = "medium"
strings:
$preamble1 = { FC 48 83 E4 F0 E8 }
$getpc1 = { E8 00 00 00 00 59 }
$getpc2 = { E8 00 00 00 00 5B }
$peb_walk = { 64 48 8B 04 25 60 00 00 00 }
condition:
any of them
}
rule Suspicious_Named_Pipe {
meta:
description = "Named pipe patterns associated with common C2 frameworks"
confidence = "high"
strings:
$cs_msse = "\\.\pipe\MSSE-" ascii wide
$cs_postex = "\\.\pipe\postex_" ascii wide
$cs_msagent = "\\.\pipe\msagent_" ascii wide
$cs_status = "\\.\pipe\status_" ascii wide
$msf_pipe = "\\.\pipe\metsrv" ascii wide
$empire_pipe = "\\.\pipe\empire" ascii wide nocase
condition:
any of them
}
rule AMSI_Bypass_Patterns {
meta:
description = "Detects common AMSI bypass technique byte patterns in memory"
confidence = "high"
strings:
$amsi_patch1 = { B8 57 00 07 80 C3 }
$amsi_str = "amsi.dll" ascii wide nocase
$amsi_func = "AmsiScanBuffer" ascii wide
condition:
$amsi_patch1 and ($amsi_str or $amsi_func)
}
rule ETW_Tamper_Patterns {
meta:
description = "Detects ETW patching techniques in process memory"
confidence = "high"
strings:
$etw_patch = { C2 14 00 }
$etw_func = "EtwEventWrite" ascii wide
$etw_disable = "EtwEventUnregister" ascii wide
condition:
$etw_patch and ($etw_func or $etw_disable)
}
rule PowerShell_Download_Cradle {
meta:
description = "Detects decoded PowerShell download cradles in process memory"
confidence = "medium"
strings:
$dl_string = "DownloadString" ascii wide nocase
$dl_file = "DownloadFile" ascii wide nocase
$webclient = "Net.WebClient" ascii wide nocase
$iex = "IEX" ascii wide
$invoke_exp = "Invoke-Expression" ascii wide nocase
$ref_load = "[Reflection.Assembly]::Load" ascii wide
condition:
2 of them
}
Compiling the combined Yara ruleset
After placing all four files in the rules directory, compile them into a single binary ruleset that Volatility’s vadyarascan plugin can load. The compiled format is faster to load than re-parsing individual text files on every scan.
## Compile all rules into combined.yar
source /opt/vol3-env/bin/activate
python3 < {OUTPUT}")
print(f"Ruleset size: {Path(OUTPUT).stat().st_size / 1024:.1f} KB")
EOF
## Verify the compiled ruleset loads correctly
python3 -c "
import yara
rules = yara.load('/opt/memory-hunter/yara_rules/combined.yar')
print('Ruleset loaded OK')
print(f'Rules available for memory scanning')
"
## Test against a known-clean binary to check false positive rate
python3 -c "
import yara
rules = yara.load('/opt/memory-hunter/yara_rules/combined.yar')
import os
test_files = ['/bin/ls', '/bin/cat', '/usr/bin/python3']
for f in test_files:
if os.path.exists(f):
matches = rules.match(f)
if matches:
print(f'FP WARNING: {f} matched {[m.rule for m in matches]}')
else:
print(f'Clean: {f}')
"
Adding your own Yara rules
The four files above are a starting point covering the most commonly encountered C2 frameworks and credential theft tools. As you encounter new malware families or build rules from your own analysis, add new .yar files to the rules directory and recompile. A few practical notes on writing rules that work well in a memory scanning context.
Avoid filesize conditions entirely since Yara scanning process memory regions does not have a meaningful file size. Avoid conditions that depend on PE structure offsets like pe.entry_point unless you are certain the region you are scanning is a complete PE and not a fragment or raw shellcode. String conditions that use fullword are more reliable in memory than substring matches because memory contains a lot of incidental short strings. For strings that appear in both legitimate software and malware (like “cmd.exe” or “powershell”), always pair them with at least one other more unique indicator before the rule can match. A condition that requires three or more strings is almost always more reliable than one requiring a single string, even if the single string seems highly distinctive on the initial sample.
The clean_samples directory at /opt/memory-hunter/yara_rules/clean_samples/ is intended for known-clean Windows executables that you run new rules against before deploying them. A useful set to maintain there: a clean copy of common system binaries (ntdll.dll, kernel32.dll, powershell.exe), clean copies of legitimate admin tools (PsExec, Process Explorer), and a small set of benign .NET assemblies. Running every new rule against these before adding it to the compiled ruleset catches false positives before they generate noise in production scans.
The complete repository structure
## Clone-ready repository layout
## All scripts from this post organised for immediate use
/opt/memory-hunter/
├── analyse.py # Main entry point - run this
├── setup_volatility_dual.sh # One-shot environment setup
├── requirements.txt # Python dependencies
├── README.md # Quick start guide
│
├── scripts/
│ ├── phase1_identify.py # OS detection and image metadata
│ ├── vol_runner.py # Vol2/Vol3 abstraction + parallel collection
│ ├── anomaly_checks.py # Detection logic and Finding class
│ ├── phase4_iocs.py # IOC extraction and Yara scanning
│ └── phase5_report.py # HTML and JSON report generation
│
├── templates/
│ └── report.html.j2 # Jinja2 HTML report template
│
├── yara_rules/
│ ├── rules/ # Individual .yar files (add yours here)
│ │ ├── cobalt_strike.yar
│ │ ├── meterpreter.yar
│ │ ├── credential_tools.yar
│ │ └── generic_suspicious.yar
│ ├── combined.yar # Compiled ruleset (auto-generated)
│ └── clean_samples/ # Known-clean files for FP testing
│
├── reports/ # Analysis output (gitignored)
└── logs/ # Run logs (gitignored)
## requirements.txt
volatility3
yara-python
pefile
capstone
python-magic
requests
jinja2
tqdm
tabulate
colorama
## Quick start
git clone https://github.com/yourrepo/memory-hunter /opt/memory-hunter
cd /opt/memory-hunter
bash setup_volatility_dual.sh
ln -sf /opt/memory-hunter/analyse.py /usr/local/bin/memory-hunt
## Compile Yara rules
python3 -c "
import yara, glob
rule_files = {f.split('/')[-1].replace('.yar',''): f
for f in glob.glob('/opt/memory-hunter/yara_rules/rules/*.yar')}
yara.compile(filepaths=rule_files).save('/opt/memory-hunter/yara_rules/combined.yar')
print(f'Compiled {len(rule_files)} rule files')
"
## Run your first analysis
memory-hunt /path/to/memory.raw
The pipeline produces two output files per image: a structured JSON file suitable for SIEM ingestion, scripted comparison across multiple images, or feeding into a correlation pipeline, and an HTML report that opens in any browser with colour-coded severity, IOC tables, full process list, and network connections. The HTML report is designed to be shared with stakeholders who need to understand the findings without running any tools themselves.
The most important design decision in the whole pipeline is the exit code contract. By mapping risk levels to exit codes (0 for clean, 3 for critical), the script integrates cleanly into any automation that knows how to act on a process exit code. Shell scripts, CI/CD pipelines, the image watcher daemon, and orchestration frameworks all speak exit codes natively. A threat hunter who wants to process fifty images and immediately see which ones need attention can run a single parallel command and act on the results without reading any output until something non-zero comes back.