A memory image taken from a live system is the closest thing to a complete picture of what was happening on that machine at a specific moment in time. Everything that was running, communicating, injected, or hiding is captured in those bytes. The challenge is that extracting useful intelligence from a raw memory image manually is slow. Running Volatility plugins one at a time, copying outputs into notes, correlating findings by hand. A thorough manual analysis of a single 16GB image can take three to four hours for an experienced analyst. When you are hunting across multiple endpoints, or when you need to answer the question “is anything malicious in this image” quickly enough to act on the result, manual analysis does not scale.
This post covers building a single script that accepts a memory image path and produces a comprehensive, prioritised threat hunting report without any further analyst input. You drop an image in, you get a report out. The script handles plugin selection, parallel execution, result parsing, anomaly detection, IOC extraction, cross-referencing between plugins, and report generation. It handles both Windows and Linux images, uses Volatility 2 where it provides unique capability and Volatility 3 for everything else, and produces both a shareable HTML report and structured JSON for SIEM ingestion.
Volatility 2 versus Volatility 3: understanding when to use each
The Volatility project has two major versions in active use. Volatility 3 is the current version, written from scratch with a cleaner architecture, better Python 3 support, and no dependency on a manually selected profile. Volatility 2 is the older version, maintained for specific use cases where it still provides capabilities or plugin depth that Volatility 3 does not yet match. Understanding the practical differences between them determines which one to call for each analysis task.
Volatility 3 should be your default for almost everything. It automatically identifies the OS version and kernel structures from the image without requiring a manually matched profile, which eliminates the most common source of beginner errors (wrong profile). It runs significantly faster on large images because of improved memory mapping. The output format is consistent JSON across all plugins, which makes automation straightforward. The symbol table system is more robust than Volatility 2’s profile system for modern Windows versions.
Volatility 2 retains specific advantages that keep it relevant for threat hunters:
# Where Volatility 2 still provides unique value
# 1. Some plugins simply do not exist yet in Volatility 3
# - mftparser: deep MFT parsing from memory
# - shellbags: user navigation history from registry in memory
# - iehistory: Internet Explorer artefacts from process memory
# - shimcache: application compatibility cache from memory
# - prefetchparser: prefetch execution records from memory
# 2. Certain Linux plugins are more mature in Volatility 2
# - linux_psaux: full command line for Linux processes (more reliable)
# - linux_netstat: Linux network connections with more detail
# - linux_check_syscall: syscall table hook detection
# 3. Legacy Windows support (XP, Vista, 2003, 2008)
# Volatility 3 symbol tables may not exist for very old OS versions
# Volatility 2 with the correct profile covers these reliably
# 4. Certain rootkit detection plugins
# - modscan: raw kernel module scanner (sometimes finds what pslist misses)
# - ssdt: SSDT hook detection (useful on older Windows)
# - apihooks: API hook detection in process memory
# Check what is available in each version
vol2 --info | grep "^[A-Z]" | sort # Volatility 2 plugins
vol3 --help 2>&1 | grep "windows\." | sort # Volatility 3 Windows plugins
The automation script uses both. Volatility 3 for the core analysis pipeline where it is faster and more reliable. Volatility 2 for specific supplementary plugins where it provides unique value. The script detects which tools are available and gracefully skips plugins that cannot run rather than failing entirely.
Setting up both versions side by side
#!/bin/bash
## setup_volatility_dual.sh
## Install Volatility 2 and 3 side by side
## Ubuntu 22.04 - run as root or with sudo
set -euo pipefail
echo "[*] Setting up dual Volatility environment"
## ── Volatility 3 ─────────────────────────────────────────────────────────────
python3 -m venv /opt/vol3-env
source /opt/vol3-env/bin/activate
pip install --upgrade pip -q
pip install volatility3 yara-python pefile capstone python-magic requests -q
# Symbol tables for Windows analysis
mkdir -p /opt/vol3-symbols
if [ ! -d /opt/vol3-symbols/windows ]; then
echo "[*] Downloading Windows symbol tables (~500MB)"
wget -q --show-progress \
https://downloads.volatilityfoundation.org/volatility3/symbols/windows.zip \
-O /tmp/windows_syms.zip
unzip -q /tmp/windows_syms.zip -d /opt/vol3-symbols/
rm /tmp/windows_syms.zip
fi
# Link symbols into Volatility 3
SITE3=$(python3 -c "import site; print(site.getsitepackages()[0])")
ln -sf /opt/vol3-symbols/windows $SITE3/volatility3/symbols/windows 2>/dev/null || true
# Linux symbol tables (requires ISF files generated from target kernel)
# These must be generated for each specific kernel version being analysed
# See: https://github.com/volatilityfoundation/dwarf2json
mkdir -p /opt/vol3-symbols/linux
echo "[+] Volatility 3: $(python3 - << 'PYEOF'
import pkg_resources
print(pkg_resources.get_distribution("volatility3").version)
PYEOF)"
deactivate
## ── Volatility 2 ─────────────────────────────────────────────────────────────
# Volatility 2 requires Python 2.7 (use pyenv or system python2)
apt-get install -y -qq python2 python2-dev python-is-python2 2>/dev/null || \
apt-get install -y -qq python2.7 python2.7-dev 2>/dev/null || true
# If python2 is not available, use Docker instead
if command -v python2 &>/dev/null; then
pip2 install --quiet distorm3 pycryptodome 2>/dev/null || true
git clone --depth 1 https://github.com/volatilityfoundation/volatility.git \
/opt/volatility2 2>/dev/null || (cd /opt/volatility2 && git pull)
echo "[+] Volatility 2: available via python2 /opt/volatility2/vol.py"
else
echo "[!] Python 2 not available - using Docker for Volatility 2"
docker pull remnux/volatility 2>/dev/null || true
cat > /usr/local/bin/vol2 << 'VOL2SCRIPT'
#!/bin/bash
docker run --rm -v "$(dirname $(realpath $1)):/data" \
remnux/volatility "$@"
VOL2SCRIPT
chmod +x /usr/local/bin/vol2
fi
## ── Supporting tools ─────────────────────────────────────────────────────────
source /opt/vol3-env/bin/activate
pip install -q \
jinja2 \
requests \
psycopg2-binary \
colorama \
tqdm \
tabulate
deactivate
# Install yara, foremost, jq from apt
apt-get install -y -qq yara foremost jq
# bulk-extractor must be built from source
apt-get install -y -qq libssl-dev libewf-dev libexpat1-dev build-essential autoconf automake libtool git
cd /tmp
git clone --recurse-submodules https://github.com/simsong/bulk_extractor.git
cd bulk_extractor
./bootstrap.sh
./configure
make -j$(nproc)
make install
cd /tmp && rm -rf bulk_extractor
echo "[+] bulk-extractor: $(bulk_extractor --version 2>&1 | head -1)"
echo ""
echo "[+] Setup complete"
echo " Volatility 3: vol (in /opt/vol3-env/bin/)"
echo " Volatility 2: python2 /opt/volatility2/vol.py (or vol2 Docker wrapper)"
echo " Symbol tables: /opt/vol3-symbols/"
The automation architecture
The script runs in five sequential phases. Each phase feeds its results into the next. Phase 1 identifies the image and OS type. Phase 2 runs a broad plugin sweep to collect all available data. Phase 3 applies anomaly detection logic across the collected data to score and prioritise findings. Phase 4 extracts and enriches IOCs. Phase 5 generates the output reports. The whole process runs in parallel where possible, with non-dependent plugins executing concurrently, which reduces total analysis time significantly compared to sequential execution.
## Directory structure for the automation pipeline
mkdir -p /opt/memory-hunter/{scripts,templates,yara_rules,reports,logs}
## Full file layout:
## /opt/memory-hunter/
## analyse.py <- Main entry point (the one script to run)
## scripts/
## phase1_identify.py
## phase2_collect.py
## phase3_detect.py
## phase4_iocs.py
## phase5_report.py
## vol_runner.py <- Abstraction layer for vol2/vol3
## anomaly_checks.py <- Detection logic
## templates/
## report.html.j2 <- Jinja2 HTML report template
## yara_rules/
## combined.yar <- Compiled Yara ruleset
## reports/ <- Output destination
## logs/ <- Analysis logs
Phase 1: image identification and OS detection
## /opt/memory-hunter/scripts/phase1_identify.py
import subprocess
import json
import logging
import hashlib
from pathlib import Path
from typing import Dict, Optional
log = logging.getLogger(__name__)
VOL3_CMD = '/opt/vol3-env/bin/vol'
VOL2_CMD = '/opt/volatility2/vol.py'
def sha256_file(path: str) -> str:
"""Calculate SHA256 of image file."""
h = hashlib.sha256()
with open(path, 'rb') as f:
for chunk in iter(lambda: f.read(65536), b''):
h.update(chunk)
return h.hexdigest()
def identify_image(image_path: str) -> Dict:
"""
Identify OS type, version, and architecture from memory image.
Returns a metadata dict used by all subsequent phases.
"""
path = Path(image_path)
if not path.exists():
raise FileNotFoundError(f"Image not found: {image_path}")
size_gb = path.stat().st_size / (1024 ** 3)
log.info(f"Image: {path.name} ({size_gb:.1f} GB)")
result = {
'path': str(path.absolute()),
'filename': path.name,
'size_gb': round(size_gb, 2),
'sha256': sha256_file(image_path),
'os_type': None,
'os_version': None,
'arch': None,
'vol3_ok': False,
'vol2_ok': False,
'vol3_info': {},
'errors': [],
}
# Try Volatility 3 first
log.info("Running windows.info to identify OS")
cmd = [VOL3_CMD, '-f', image_path, '--renderer', 'json', 'windows.info']
r = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if r.returncode == 0 and r.stdout.strip():
try:
data = json.loads(r.stdout)
rows = data.get('rows', data) if isinstance(data, dict) else data
info = {row[0]: row[1] for row in rows if isinstance(row, list) and len(row) >= 2}
result['os_type'] = 'windows'
result['vol3_ok'] = True
result['vol3_info'] = info
result['os_version'] = info.get('NtBuildLab', info.get('Kernel Version', 'Unknown'))
result['arch'] = '64-bit' if '64' in str(info.get('Kernel Base', '')) else '32/64-bit'
log.info(f"Identified: Windows {result['os_version']}")
return result
except (json.JSONDecodeError, Exception) as e:
result['errors'].append(f"Vol3 windows.info parse error: {e}")
# Try Linux identification
log.info("Trying linux.bash for Linux identification")
cmd = [VOL3_CMD, '-f', image_path, '--renderer', 'json', 'linux.bash']
r = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if r.returncode == 0 and 'bash' in r.stdout.lower():
result['os_type'] = 'linux'
result['vol3_ok'] = True
log.info("Identified: Linux image")
return result
# Fall back to Volatility 2 for profile-based identification
log.info("Trying Volatility 2 imageinfo")
if Path(VOL2_CMD).exists():
cmd = ['python2', VOL2_CMD, '-f', image_path, 'imageinfo']
r = subprocess.run(cmd, capture_output=True, text=True, timeout=180)
if r.returncode == 0:
for line in r.stdout.splitlines():
if 'Suggested Profile' in line:
profile = line.split(':')[1].strip().split(',')[0].strip()
result['vol2_ok'] = True
result['os_version'] = profile
result['os_type'] = 'linux' if 'Linux' in profile else 'windows'
log.info(f"Vol2 identified: {profile}")
return result
# Last resort: try vol2 imageinfo directly
import shutil
if shutil.which('vol2') or Path(VOL2_CMD).exists():
log.info('Trying Volatility 2 imageinfo as final fallback')
cmd = ['vol2', '-f', image_path, 'imageinfo'] if shutil.which('vol2') \
else ['python2', VOL2_CMD, '-f', image_path, 'imageinfo']
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
for line in r.stdout.splitlines():
if 'Suggested Profile' in line:
profile = line.split(':')[1].strip().split(',')[0].strip()
result['vol2_ok'] = True
result['vol2_profile'] = profile
result['os_version'] = profile
result['os_type'] = 'linux' if 'Linux' in profile else 'windows'
log.info(f'Vol2 identified: {profile}')
return result
except Exception as e:
result['errors'].append(f'Vol2 imageinfo failed: {e}')
result['errors'].append("Could not identify OS type from image")
log.error("Image identification failed")
return result
Phase 2: parallel plugin collection
## /opt/memory-hunter/scripts/vol_runner.py
## Abstraction layer that handles vol2/vol3 differences
import subprocess
import json
import logging
import concurrent.futures
from typing import List, Dict, Optional, Tuple
from pathlib import Path
log = logging.getLogger(__name__)
VOL3 = '/opt/vol3-env/bin/vol'
VOL2 = '/opt/volatility2/vol.py'
# Plugin definitions: (vol3_plugin, vol2_plugin, timeout_seconds, description)
WINDOWS_PLUGINS = [
# Core triage - always run
('windows.pslist', 'pslist', 120, 'Process list'),
('windows.pstree', 'pstree', 120, 'Process tree'),
('windows.cmdline', 'cmdline', 180, 'Command line arguments'),
('windows.netscan', 'netscan', 180, 'Network connections'),
('windows.netstat', None, 120, 'Network statistics'),
# Injection detection
('windows.malfind', 'malfind', 900, 'Injection detection'),
('windows.vadinfo', 'vadinfo', 300, 'VAD region analysis'),
('windows.dlllist', 'dlllist', 300, 'Loaded DLL list'),
# Persistence and privilege
('windows.svcscan', 'svcscan', 180, 'Windows services'),
('windows.scheduled_tasks', None, 120, 'Scheduled tasks'),
('windows.registry.hivelist', 'hivelist',120, 'Registry hives'),
('windows.registry.printkey', None, 120, 'Registry run keys'),
# Kernel integrity
('windows.callbacks', None, 180, 'Kernel callbacks'),
('windows.modules', 'modules', 120, 'Loaded kernel modules'),
('windows.driverscan', 'driverscan', 180, 'Driver scan'),
('windows.ssdt', 'ssdt', 120, 'SSDT entries'),
# Evidence and artefacts
('windows.handles', 'handles', 900, 'Open handles'),
('windows.dumpfiles', None, 300, 'Mapped files'),
('windows.mftscan', None, 300, 'MFT entries'),
('windows.envars', 'envars', 120, 'Environment variables'),
# Credential access indicators
('windows.lsadump', None, 120, 'LSA credentials'),
('windows.hashdump', 'hashdump', 120, 'Password hashes'),
]
LINUX_PLUGINS = [
('linux.pslist', 'linux_pslist', 120, 'Process list'),
('linux.pstree', 'linux_pstree', 120, 'Process tree'),
('linux.bash', 'linux_bash', 120, 'Bash history'),
('linux.netstat', 'linux_netstat', 120, 'Network connections'),
('linux.malfind', None, 600, 'Injection detection'),
('linux.lsmod', 'linux_lsmod', 120, 'Loaded kernel modules'),
('linux.check_syscall', 'linux_check_syscall', 180, 'Syscall table integrity'),
('linux.check_modules', 'linux_check_modules', 120, 'Module integrity'),
('linux.keyboard_notifiers', None, 120, 'Keyboard hooks (rootkit)'),
('linux.envars', None, 120, 'Environment variables'),
('linux.proc_maps', None, 300, 'Process memory maps'),
]
# Vol2-only plugins that add unique value
VOL2_ONLY_WINDOWS = [
('mftparser', 240, 'MFT parser (Vol2 unique)'),
('shimcache', 120, 'Shimcache (Vol2 unique)'),
('prefetchparser', 120, 'Prefetch (Vol2 unique)'),
('iehistory', 120, 'IE history (Vol2 unique)'),
]
VOL2_ONLY_LINUX = [
('linux_psaux', 120, 'Full process args (Vol2 unique)'),
('linux_check_afinfo', 120, 'Network hook detection (Vol2 unique)'),
]
def run_vol3(image_path: str, plugin: str, timeout: int = 300,
extra_args: List[str] = None) -> Tuple[str, List]:
"""Run a Volatility 3 plugin, return (plugin_name, results_list)."""
cmd = [VOL3, '-f', image_path, '--renderer', 'json', plugin]
if extra_args:
cmd.extend(extra_args)
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
if r.returncode != 0 or not r.stdout.strip():
return plugin, []
data = json.loads(r.stdout)
if isinstance(data, dict):
return plugin, data.get('rows', [])
return plugin, data if isinstance(data, list) else []
except subprocess.TimeoutExpired:
log.warning(f"TIMEOUT: {plugin} ({timeout}s)")
return plugin, []
except Exception as e:
log.debug(f"Error in {plugin}: {e}")
return plugin, []
def run_vol2(image_path: str, plugin: str, profile: str,
timeout: int = 300, extra_args: List[str] = None) -> Tuple[str, str]:
"""Run a Volatility 2 plugin, return (plugin_name, raw_text_output)."""
if not Path(VOL2).exists():
return plugin, ''
cmd = ['python2', VOL2, '-f', image_path,
f'--profile={profile}', plugin]
if extra_args:
cmd.extend(extra_args)
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
return plugin, r.stdout
except subprocess.TimeoutExpired:
log.warning(f"TIMEOUT (vol2): {plugin}")
return plugin, ''
except Exception as e:
log.debug(f"Vol2 error in {plugin}: {e}")
return plugin, ''
def collect_all_plugins(image_path: str, image_info: Dict,
max_workers: int = 8) -> Dict:
"""
Run all applicable plugins in parallel.
Returns dict of plugin_name -> results.
"""
os_type = image_info.get('os_type', 'windows')
profile = image_info.get('vol2_profile') or image_info.get('os_version', '')
plugins = WINDOWS_PLUGINS if os_type == 'windows' else LINUX_PLUGINS
vol2_only = VOL2_ONLY_WINDOWS if os_type == 'windows' else VOL2_ONLY_LINUX
results = {}
total = len(plugins) + len(vol2_only)
done = 0
log.info(f"Running {total} plugins with {max_workers} parallel workers")
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit plugins - prefer vol2 when vol3 symbol lookup failed (old images)
futures = {}
vol3_working = image_info.get('vol3_ok') and bool(image_info.get('vol3_info'))
for vol3_plugin, vol2_plugin, timeout, desc in plugins:
if vol3_working and vol3_plugin:
f = executor.submit(run_vol3, image_path, vol3_plugin, timeout)
futures[f] = (vol3_plugin, desc)
elif image_info.get('vol2_ok') and vol2_plugin and profile:
# Vol3 symbols missing - fall back to vol2
f = executor.submit(run_vol2, image_path, vol2_plugin,
profile, timeout)
futures[f] = (vol2_plugin, desc)
elif image_info.get('vol3_ok') and vol3_plugin:
# Try vol3 anyway even without confirmed symbols
f = executor.submit(run_vol3, image_path, vol3_plugin, timeout)
futures[f] = (vol3_plugin, desc)
# Submit vol2-only plugins
for vol2_plugin, timeout, desc in vol2_only:
if image_info.get('vol2_ok') or Path(VOL2).exists():
f = executor.submit(run_vol2, image_path, vol2_plugin,
profile, timeout)
futures[f] = (vol2_plugin, desc)
# Collect results as they complete
for future in concurrent.futures.as_completed(futures):
plugin_name, desc = futures[future]
try:
name, data = future.result()
results[name] = data
done += 1
count = len(data) if isinstance(data, list) else len(data.splitlines())
log.info(f" [{done}/{total}] {desc}: {count} records")
except Exception as e:
log.error(f"Plugin {plugin_name} failed: {e}")
results[plugin_name] = []
done += 1
return results
Phase 3: anomaly detection and scoring
## /opt/memory-hunter/scripts/anomaly_checks.py
## Applies detection logic across collected plugin results
import re
import logging
from typing import Dict, List, Tuple
log = logging.getLogger(__name__)
# Legitimate system processes and their expected parents on Windows
WINDOWS_PROCESS_RULES = {
'system': {'expected_parents': [''], 'expected_path': ''},
'smss.exe': {'expected_parents': ['System'], 'expected_path': r'windows\system32'},
'csrss.exe': {'expected_parents': ['smss.exe'], 'expected_path': r'windows\system32'},
'wininit.exe': {'expected_parents': ['smss.exe'], 'expected_path': r'windows\system32'},
'winlogon.exe':{'expected_parents': ['smss.exe'], 'expected_path': r'windows\system32'},
'services.exe':{'expected_parents': ['wininit.exe'],'expected_path': r'windows\system32'},
'lsass.exe': {'expected_parents': ['wininit.exe'],'expected_path': r'windows\system32'},
'svchost.exe': {'expected_parents': ['services.exe','msiexec.exe'],
'expected_path': r'windows\system32'},
'explorer.exe':{'expected_parents': ['userinit.exe',''],
'expected_path': r'windows'},
'taskhost.exe':{'expected_parents': ['services.exe'],'expected_path': r'windows\system32'},
'spoolsv.exe': {'expected_parents': ['services.exe'],'expected_path': r'windows\system32'},
}
# Office apps that should never spawn shells
SUSPICIOUS_PARENT_CHILD = {
'winword.exe': ['cmd.exe', 'powershell.exe', 'wscript.exe', 'cscript.exe',
'mshta.exe', 'regsvr32.exe', 'rundll32.exe'],
'excel.exe': ['cmd.exe', 'powershell.exe', 'wscript.exe', 'cscript.exe'],
'powerpnt.exe': ['cmd.exe', 'powershell.exe', 'wscript.exe'],
'outlook.exe': ['cmd.exe', 'powershell.exe', 'wscript.exe'],
'acrord32.exe': ['cmd.exe', 'powershell.exe', 'wscript.exe'],
'wmiprvse.exe': ['cmd.exe', 'powershell.exe'],
}
# Paths that are always suspicious for system binaries
SUSPICIOUS_PATHS = [
r'\\temp\\', r'\\tmp\\', r'\\appdata\\',
r'\\public\\', r'\\downloads\\', r'\\desktop\\',
r'\\programdata\\', r'\\recycle'
]
SUSPICIOUS_PORTS = {4444, 8080, 8443, 1337, 31337, 9001, 6667, 4545}
class Finding:
def __init__(self, severity: str, category: str, title: str,
detail: str, pid: int = None, process: str = None):
self.severity = severity # CRITICAL / HIGH / MEDIUM / LOW
self.category = category
self.title = title
self.detail = detail
self.pid = pid
self.process = process
self.score = {'CRITICAL': 40, 'HIGH': 20, 'MEDIUM': 10, 'LOW': 5}[severity]
def to_dict(self):
return {
'severity': self.severity,
'category': self.category,
'title': self.title,
'detail': self.detail,
'pid': self.pid,
'process': self.process,
'score': self.score,
}
def check_processes(pslist: List, pstree: List, cmdline: List) -> List[Finding]:
"""Detect process anomalies: masquerading, suspicious parents, unusual paths."""
findings = []
# Build lookup dicts
pid_to_name = {}
pid_to_ppid = {}
pid_to_path = {}
pid_to_cmd = {}
for proc in pslist:
if not isinstance(proc, (list, dict)):
continue
if isinstance(proc, list):
# Vol3 returns lists: [PID, PPID, ImageFileName, Offset, Threads, Handles, SessionId, Wow64, CreateTime, ExitTime, File output]
pid = proc[0] if len(proc) > 0 else 0
ppid = proc[1] if len(proc) > 1 else 0
name = (proc[2] if len(proc) > 2 else '').lower()
path = (proc[10] if len(proc) > 10 else '').lower()
else:
pid = proc.get('PID', proc.get('pid', 0))
ppid = proc.get('PPID', proc.get('ppid', 0))
name = proc.get('ImageFileName', proc.get('name', '')).lower()
path = proc.get('Path', proc.get('path', '')).lower()
pid_to_name[pid] = name
pid_to_ppid[pid] = ppid
pid_to_path[pid] = path
for cmd_entry in cmdline:
if isinstance(cmd_entry, list) and len(cmd_entry) >= 3:
pid_to_cmd[cmd_entry[0]] = cmd_entry[2] or ''
elif isinstance(cmd_entry, dict):
pid_to_cmd[cmd_entry.get('PID', 0)] = cmd_entry.get('Args', '')
# Check each process
for pid, name in pid_to_name.items():
path = pid_to_path.get(pid, '')
ppid = pid_to_ppid.get(pid, 0)
parent = pid_to_name.get(ppid, '').lower()
cmd = pid_to_cmd.get(pid, '')
# 1. Process in suspicious location
for sus_path in SUSPICIOUS_PATHS:
if sus_path in path.replace('\\', '\\\\'):
if name in [k for k in WINDOWS_PROCESS_RULES.keys()]:
findings.append(Finding(
'CRITICAL', 'process_masquerade',
f'System process in suspicious location',
f'{name} (PID {pid}) running from: {path}',
pid, name
))
elif name.endswith('.exe'):
findings.append(Finding(
'HIGH', 'suspicious_path',
f'Executable in staging location',
f'{name} (PID {pid}) at {path}',
pid, name
))
# 2. Suspicious parent-child pairs
for parent_name, bad_children in SUSPICIOUS_PARENT_CHILD.items():
if parent == parent_name and name in bad_children:
findings.append(Finding(
'CRITICAL', 'suspicious_spawn',
f'Office/PDF app spawned shell',
f'{parent} (PPID {ppid}) spawned {name} (PID {pid})\nCmd: {cmd[:200]}',
pid, name
))
# 3. Encoded PowerShell
if name == 'powershell.exe' and cmd:
if any(enc in cmd.lower() for enc in ['-enc', '-encodedcommand', '-e ']):
findings.append(Finding(
'HIGH', 'encoded_powershell',
'PowerShell with encoded command',
f'PID {pid}: {cmd[:300]}',
pid, name
))
if any(sus in cmd.lower() for sus in [
'downloadstring', 'downloadfile', 'webclient',
'invoke-expression', 'iex ', 'frombase64'
]):
findings.append(Finding(
'HIGH', 'ps_download_cradle',
'PowerShell download cradle detected',
f'PID {pid}: {cmd[:300]}',
pid, name
))
# 4. WMI execution chain
if parent == 'wmiprvse.exe' and name in ['cmd.exe', 'powershell.exe',
'wscript.exe', 'cscript.exe']:
findings.append(Finding(
'HIGH', 'wmi_execution',
'WMI spawned command interpreter',
f'WmiPrvSE spawned {name} (PID {pid}): {cmd[:200]}',
pid, name
))
return findings
def check_malfind(malfind_results: List) -> List[Finding]:
"""Score and categorise malfind results."""
findings = []
for region in malfind_results:
if isinstance(region, list):
pid = region[0] if len(region) > 0 else 0
name = region[1] if len(region) > 1 else ''
start = region[3] if len(region) > 3 else 0
prot = region[5] if len(region) > 5 else ''
hexd = str(region[7]) if len(region) > 7 else ''
elif isinstance(region, dict):
pid = region.get('PID', region.get('Pid', 0))
name = region.get('Process', region.get('ImageFileName', ''))
start = region.get('Start', region.get('VadStart', 0))
prot = region.get('Protection', '')
hexd = str(region.get('Hexdump', region.get('Data', '')))
else:
continue
has_pe = hexd.strip().startswith('4d 5a') or hexd.strip().startswith('MZ')
is_rwx = 'EXECUTE_READ_WRITE' in str(prot)
if has_pe and is_rwx:
findings.append(Finding(
'CRITICAL', 'injection',
'PE file in RWX anonymous memory (reflective loading)',
f'PID {pid} ({name}): addr=0x{start:x} protection={prot}',
pid, name
))
elif has_pe:
findings.append(Finding(
'HIGH', 'injection',
'PE header in executable anonymous memory',
f'PID {pid} ({name}): addr=0x{start:x} protection={prot}',
pid, name
))
elif is_rwx:
findings.append(Finding(
'HIGH', 'injection',
'RWX anonymous memory region (shellcode staging)',
f'PID {pid} ({name}): addr=0x{start:x}',
pid, name
))
return findings
def check_network(netscan_results: List) -> List[Finding]:
"""Detect suspicious network activity."""
findings = []
internal = ['10.', '172.16.', '172.17.', '172.18.', '172.19.',
'172.20.', '172.21.', '172.22.', '172.23.', '172.24.',
'172.25.', '172.26.', '172.27.', '172.28.', '172.29.',
'172.30.', '172.31.', '192.168.', '127.', '0.0.0.0']
for conn in netscan_results:
if isinstance(conn, list):
proto = str(conn[0]) if len(conn) > 0 else ''
local = str(conn[1]) if len(conn) > 1 else ''
remote = str(conn[3]) if len(conn) > 3 else ''
rport = int(conn[4]) if len(conn) > 4 else 0
state = str(conn[5]) if len(conn) > 5 else ''
pid = int(conn[6]) if len(conn) > 6 else 0
name = str(conn[7]) if len(conn) > 7 else ''
elif isinstance(conn, dict):
remote = str(conn.get('ForeignAddr', conn.get('RemoteAddr', '')))
rport = int(conn.get('ForeignPort', conn.get('RemotePort', 0)))
state = str(conn.get('State', ''))
pid = int(conn.get('PID', conn.get('Pid', 0)))
name = str(conn.get('Owner', conn.get('Process', '')))
else:
continue
if 'ESTABLISHED' not in state:
continue
remote_ip = remote.split(':')[0] if ':' in remote else remote
is_external = not any(remote_ip.startswith(r) for r in internal)
if is_external:
if rport in SUSPICIOUS_PORTS:
findings.append(Finding(
'HIGH', 'suspicious_network',
f'Connection to external IP on known C2 port',
f'{name} (PID {pid}) -> {remote_ip}:{rport}',
pid, name
))
elif name.lower() in ['svchost.exe', 'lsass.exe', 'csrss.exe',
'winlogon.exe', 'services.exe']:
findings.append(Finding(
'HIGH', 'suspicious_network',
f'System process with external network connection',
f'{name} (PID {pid}) -> {remote_ip}:{rport}',
pid, name
))
return findings
def check_services(svcscan: List) -> List[Finding]:
"""Detect suspicious service configurations."""
findings = []
suspicious_paths = [r'\temp\\', r'\tmp\\', r'\appdata\\',
r'\public\\', r'\programdata\\']
for svc in svcscan:
if isinstance(svc, list):
name = str(svc[0]) if len(svc) > 0 else ''
binary = str(svc[4]) if len(svc) > 4 else ''
state = str(svc[2]) if len(svc) > 2 else ''
elif isinstance(svc, dict):
name = str(svc.get('ServiceName', svc.get('Name', '')))
binary = str(svc.get('Binary', svc.get('Path', '')))
state = str(svc.get('State', ''))
else:
continue
binary_lower = binary.lower()
for sus in suspicious_paths:
if sus in binary_lower:
findings.append(Finding(
'HIGH', 'suspicious_service',
'Service binary in staging location',
f'Service: {name} | Binary: {binary}',
None, name
))
if 'powershell' in binary_lower or 'cmd.exe /c' in binary_lower:
findings.append(Finding(
'HIGH', 'suspicious_service',
'Service using interpreter as binary',
f'Service: {name} | Binary: {binary}',
None, name
))
return findings
def check_kernel_integrity(callbacks: List, modules: List,
driverscan: List) -> List[Finding]:
"""Detect kernel-level tampering indicators."""
findings = []
# Known legitimate callback registrants
known_callbacks = [
'ntoskrnl.exe', 'nt', 'win32k.sys', 'ndis.sys',
'tcpip.sys', 'fltmgr.sys', 'ci.dll',
]
for cb in callbacks:
if isinstance(cb, list):
callback_type = str(cb[0]) if len(cb) > 0 else ''
module = str(cb[2]) if len(cb) > 2 else ''
elif isinstance(cb, dict):
callback_type = str(cb.get('Type', ''))
module = str(cb.get('Module', ''))
else:
continue
module_lower = module.lower()
is_known = any(known in module_lower for known in known_callbacks)
if not is_known and module:
findings.append(Finding(
'MEDIUM', 'kernel_callback',
f'Unknown kernel callback registration',
f'Type: {callback_type} | Module: {module}',
))
# Check for hidden modules (in driverscan but not modules)
module_bases = set()
for mod in modules:
if isinstance(mod, list) and len(mod) > 1:
module_bases.add(str(mod[1]))
elif isinstance(mod, dict):
module_bases.add(str(mod.get('Base', '')))
for drv in driverscan:
if isinstance(drv, list) and len(drv) > 1:
base = str(drv[1])
name = str(drv[0]) if len(drv) > 0 else ''
elif isinstance(drv, dict):
base = str(drv.get('Offset', ''))
name = str(drv.get('Name', ''))
else:
continue
if base and base not in module_bases:
findings.append(Finding(
'HIGH', 'hidden_driver',
'Driver found by scan not in module list (possible rootkit)',
f'Name: {name} | Base: {base}',
))
return findings
def run_all_checks(plugin_results: Dict) -> Tuple[List[Finding], int]:
"""Run all anomaly checks and return findings with total risk score."""
all_findings = []
all_findings.extend(check_processes(
plugin_results.get('windows.pslist', []),
plugin_results.get('windows.pstree', []),
plugin_results.get('windows.cmdline', []),
))
all_findings.extend(check_malfind(
plugin_results.get('windows.malfind', []) +
plugin_results.get('linux.malfind', [])
))
all_findings.extend(check_network(
plugin_results.get('windows.netscan', []) +
plugin_results.get('linux.netstat', [])
))
all_findings.extend(check_services(
plugin_results.get('windows.svcscan', [])
))
all_findings.extend(check_kernel_integrity(
plugin_results.get('windows.callbacks', []),
plugin_results.get('windows.modules', []),
plugin_results.get('windows.driverscan', []),
))
total_score = sum(f.score for f in all_findings)
all_findings.sort(key=lambda f: f.score, reverse=True)
return all_findings, total_score
Phase 4: IOC extraction and Yara scanning
## /opt/memory-hunter/scripts/phase4_iocs.py
import re
import subprocess
import json
import logging
from typing import Dict, List, Set
from pathlib import Path
log = logging.getLogger(__name__)
VOL3 = '/opt/vol3-env/bin/vol'
YARA_RULES = '/opt/memory-hunter/yara_rules/combined.yar'
# Patterns for IOC extraction
PATTERNS = {
'ipv4': re.compile(r'\b(?:(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\b'),
'url': re.compile(r'https?://[a-zA-Z0-9._/?=&%+-]{10,300}'),
'domain': re.compile(r'\b(?:[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?\.){1,10}(?:com|net|org|io|co|xyz|top|tk|ru|cn|de|info|biz)\b', re.I),
'email': re.compile(r'\b[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}\b'),
'hash_md5': re.compile(r'\b[a-fA-F0-9]{32}\b'),
'hash_sha256': re.compile(r'\b[a-fA-F0-9]{64}\b'),
'registry_run':re.compile(r'SOFTWARE\\(?:Microsoft\\Windows\\CurrentVersion\\Run|Wow6432Node)[^"\'\\]+', re.I),
'pipe': re.compile(r'\\\\\.\\pipe\\[a-zA-Z0-9_-]{4,}'),
'base64_large':re.compile(r'[A-Za-z0-9+/]{100,}={0,2}'),
}
INTERNAL_IPS = ['10.', '172.16.', '192.168.', '127.', '169.254.']
LEGIT_DOMAINS = ['microsoft.com', 'windows.com', 'windowsupdate.com',
'google.com', 'akamai.com', 'cloudflare.com',
'amazon.com', 'amazonaws.com']
def extract_strings_from_memory(image_path: str,
suspicious_pids: List[int]) -> str:
"""Extract strings from suspicious process memory regions."""
if not suspicious_pids:
return ''
pid_args = ['--pid', ','.join(str(p) for p in suspicious_pids[:10])]
cmd = [VOL3, '-f', image_path, '--renderer', 'json',
'windows.strings'] + pid_args
r = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
return r.stdout if r.returncode == 0 else ''
def run_yara_scan(image_path: str) -> List[Dict]:
"""Run Yara rules against process memory via Volatility."""
if not Path(YARA_RULES).exists():
log.warning("Yara rules not found - skipping Yara scan")
return []
cmd = [VOL3, '-f', image_path, '--renderer', 'json',
'windows.vadyarascan', '--yara-file', YARA_RULES]
r = subprocess.run(cmd, capture_output=True, text=True, timeout=600)
if r.returncode != 0:
return []
try:
data = json.loads(r.stdout)
rows = data.get('rows', data) if isinstance(data, dict) else data
return rows
except:
return []
def extract_iocs(plugin_results: Dict, image_path: str,
suspicious_pids: List[int]) -> Dict:
"""Extract all IOC types from plugin results and process memory strings."""
iocs: Dict[str, Set] = {k: set() for k in PATTERNS}
iocs['yara_hits'] = set()
# Extract from network connections
for conn in plugin_results.get('windows.netscan', []):
remote = ''
if isinstance(conn, list):
remote = str(conn[3]) if len(conn) > 3 else ''
elif isinstance(conn, dict):
remote = str(conn.get('ForeignAddr', ''))
remote_ip = remote.split(':')[0] if ':' in remote else remote
if remote_ip and not any(remote_ip.startswith(r) for r in INTERNAL_IPS):
iocs['ipv4'].add(remote_ip)
# Extract from command lines
for cmd_entry in plugin_results.get('windows.cmdline', []):
cmd_text = ''
if isinstance(cmd_entry, list) and len(cmd_entry) > 2:
cmd_text = str(cmd_entry[2])
elif isinstance(cmd_entry, dict):
cmd_text = str(cmd_entry.get('Args', ''))
for ioc_type, pattern in PATTERNS.items():
for match in pattern.findall(cmd_text):
if ioc_type == 'ipv4' and not any(match.startswith(r) for r in INTERNAL_IPS):
iocs[ioc_type].add(match)
elif ioc_type == 'url' and not any(d in match for d in LEGIT_DOMAINS):
iocs[ioc_type].add(match[:200])
elif ioc_type not in ('ipv4', 'url'):
iocs[ioc_type].add(match[:200])
# Extract from process memory strings
if suspicious_pids:
strings_output = extract_strings_from_memory(image_path, suspicious_pids)
for ioc_type, pattern in PATTERNS.items():
for match in pattern.findall(strings_output):
if ioc_type == 'url' and not any(d in match for d in LEGIT_DOMAINS):
iocs[ioc_type].add(match[:200])
elif ioc_type == 'ipv4' and not any(match.startswith(r) for r in INTERNAL_IPS):
iocs[ioc_type].add(match)
elif ioc_type == 'pipe' and 'pipe\\' in match.lower():
iocs[ioc_type].add(match)
# Yara scan
yara_hits = run_yara_scan(image_path)
for hit in yara_hits:
if isinstance(hit, list):
rule = str(hit[2]) if len(hit) > 2 else ''
pid = str(hit[0]) if len(hit) > 0 else ''
proc = str(hit[1]) if len(hit) > 1 else ''
elif isinstance(hit, dict):
rule = str(hit.get('Rule', ''))
pid = str(hit.get('PID', ''))
proc = str(hit.get('Process', ''))
else:
continue
if rule:
iocs['yara_hits'].add(f"{rule} (PID {pid} / {proc})")
log.warning(f"YARA HIT: {rule} in PID {pid} ({proc})")
return {k: sorted(list(v)) for k, v in iocs.items()}
Phase 5: report generation (HTML and JSON)
## /opt/memory-hunter/scripts/phase5_report.py
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, List
from jinja2 import Template
log = logging.getLogger(__name__)
REPORT_DIR = Path('/opt/memory-hunter/reports')
HTML_TEMPLATE = """
Memory Analysis: {{ meta.filename }}
* { box-sizing: border-box; margin: 0; padding: 0; }
body { font-family: 'Segoe UI', system-ui, sans-serif; background: #f1f5f9;
color: #0f172a; font-size: 14px; }
.header { background: #0f172a; color: #e2e8f0; padding: 24px 32px; }
.header h1 { font-size: 20px; font-weight: 700; }
.header .meta { font-family: monospace; font-size: 11px; color: #64748b;
margin-top: 6px; }
.risk-banner { padding: 14px 32px; font-weight: 700; font-size: 15px; }
.risk-CRITICAL { background: #450a0a; color: #fca5a5; }
.risk-HIGH { background: #431407; color: #fed7aa; }
.risk-MEDIUM { background: #422006; color: #fde68a; }
.risk-LOW { background: #052e16; color: #86efac; }
.container { padding: 24px 32px; max-width: 1400px; }
.section { background: white; border-radius: 10px; margin-bottom: 20px;
border: 1px solid #e2e8f0; overflow: hidden; }
.section-header { padding: 14px 18px; background: #f8fafc;
border-bottom: 1px solid #e2e8f0; font-weight: 600;
font-size: 13px; display: flex; align-items: center; gap: 10px; }
.count-badge { background: #1e293b; color: #94a3b8; padding: 2px 8px;
border-radius: 4px; font-family: monospace; font-size: 10px; }
.finding { padding: 14px 18px; border-bottom: 1px solid #f1f5f9; }
.finding:last-child { border-bottom: none; }
.finding-header { display: flex; align-items: center; gap: 10px;
margin-bottom: 6px; }
.sev { padding: 2px 8px; border-radius: 4px; font-size: 10px;
font-weight: 700; font-family: monospace; }
.sev-CRITICAL { background: #450a0a; color: #fca5a5; border: 1px solid #ef4444; }
.sev-HIGH { background: #431407; color: #fed7aa; border: 1px solid #f97316; }
.sev-MEDIUM { background: #422006; color: #fde68a; border: 1px solid #eab308; }
.sev-LOW { background: #052e16; color: #86efac; border: 1px solid #22c55e; }
.finding-title { font-weight: 600; font-size: 13px; }
.finding-detail { font-family: monospace; font-size: 11px; color: #475569;
background: #f8fafc; padding: 8px 10px; border-radius: 4px;
margin-top: 6px; white-space: pre-wrap; word-break: break-all; }
.ioc-grid { display: grid; grid-template-columns: repeat(auto-fill, minmax(300px,1fr));
gap: 12px; padding: 16px 18px; }
.ioc-group { }
.ioc-label { font-size: 9px; font-family: monospace; text-transform: uppercase;
letter-spacing: 0.1em; color: #94a3b8; margin-bottom: 6px; }
.ioc-value { font-family: monospace; font-size: 11px; color: #0369a1;
background: #e0f2fe; padding: 3px 8px; border-radius: 3px;
margin-bottom: 3px; word-break: break-all; }
.stats { display: grid; grid-template-columns: repeat(5, 1fr);
gap: 12px; margin-bottom: 20px; }
.stat-card { background: white; border-radius: 8px; padding: 16px;
border: 1px solid #e2e8f0; text-align: center; }
.stat-val { font-size: 28px; font-weight: 800; }
.stat-lbl { font-size: 10px; color: #94a3b8; text-transform: uppercase;
letter-spacing: 0.1em; margin-top: 4px; }
.val-critical { color: #ef4444; } .val-high { color: #f97316; }
.val-medium { color: #eab308; } .val-low { color: #22c55e; }
table { width: 100%; border-collapse: collapse; }
th { text-align: left; padding: 8px 12px; font-size: 10px; color: #64748b;
text-transform: uppercase; letter-spacing: 0.08em;
border-bottom: 1px solid #e2e8f0; background: #f8fafc; }
td { padding: 8px 12px; border-bottom: 1px solid #f1f5f9;
font-family: monospace; font-size: 11px; }
tr:last-child td { border-bottom: none; }
.tag { display: inline-block; padding: 1px 6px; border-radius: 3px;
font-size: 9px; font-family: monospace; }
.tag-ext { background: #fee2e2; color: #991b1b; }
.tag-sus { background: #fef3c7; color: #92400e; }
Memory Analysis Report: {{ meta.filename }}
{{ findings|selectattr('severity','eq','CRITICAL')|list|length }}
Critical
{{ findings|selectattr('severity','eq','HIGH')|list|length }}
High
{{ findings|selectattr('severity','eq','MEDIUM')|list|length }}
Medium
{{ iocs.yara_hits|length }}
Yara Hits
{{ (iocs.ipv4|length) + (iocs.url|length) }}
Network IOCs
{% if findings %}
Threat Findings
{{ findings|length }}
{% for f in findings %}
{{ f.severity }}
{{ f.title }}
{% if f.process %}{{ f.process }}
{% if f.pid %}(PID {{ f.pid }}){% endif %}{% endif %}
{{ f.detail }}
{% endfor %}
{% endif %}
{% if iocs.yara_hits %}
Yara Rule Matches {{ iocs.yara_hits|length }}
{% for hit in iocs.yara_hits %}
{{ hit }}
{% endfor %}
{% endif %}
Extracted IOCs
{% for ioc_type, values in iocs.items() %}
{% if values and ioc_type != 'yara_hits' %}
{{ ioc_type.replace('_',' ') }} ({{ values|length }})
{% for v in values[:20] %}
{{ v }}
{% endfor %}
{% if values|length > 20 %}...{{ values|length - 20 }} more{% endif %}
{% endif %}
{% endfor %}
Network Connections {{ network|length }}
PID Process Local Remote Port State
{% for conn in network[:50] %}
{{ conn[6] if conn|length > 6 else '' }}
{{ conn[7] if conn|length > 7 else '' }}
{{ conn[1] if conn|length > 1 else '' }}
{% set r = conn[3] if conn|length > 3 else '' %}
{{ r }}
{% if r and not r.startswith(('10.','192.168.','172.','127.')) %}
EXT{% endif %}
{{ conn[4] if conn|length > 4 else '' }}
{{ conn[5] if conn|length > 5 else '' }}
{% endfor %}
Process List {{ processes|length }}
PID PPID Name Path
{% for proc in processes[:100] %}
{{ proc[0] if proc|length > 0 else '' }}
{{ proc[1] if proc|length > 1 else '' }}
{{ proc[2] if proc|length > 2 else '' }}
{% endfor %}
"""
def generate_reports(meta: Dict, findings: List, iocs: Dict,
plugin_results: Dict) -> Dict:
"""Generate HTML and JSON reports. Returns paths to both files."""
risk_score = sum(f.score for f in findings)
risk_label = (
'CRITICAL' if risk_score >= 60 else
'HIGH' if risk_score >= 30 else
'MEDIUM' if risk_score >= 10 else
'LOW'
)
hostname = meta['filename'].split('_')[0]
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
report_dir = REPORT_DIR / f"{hostname}_{timestamp}"
report_dir.mkdir(parents=True, exist_ok=True)
findings_dicts = [f.to_dict() for f in findings]
# ── JSON report ───────────────────────────────────────────────────────────
json_data = {
'meta': meta,
'generated_at': datetime.now().isoformat(),
'risk_label': risk_label,
'risk_score': risk_score,
'findings': findings_dicts,
'iocs': iocs,
'summary': {
'critical': sum(1 for f in findings if f.severity == 'CRITICAL'),
'high': sum(1 for f in findings if f.severity == 'HIGH'),
'medium': sum(1 for f in findings if f.severity == 'MEDIUM'),
'yara_hits':len(iocs.get('yara_hits', [])),
'ioc_count': sum(len(v) for v in iocs.values()),
}
}
json_path = report_dir / 'report.json'
with open(json_path, 'w') as f:
json.dump(json_data, f, indent=2, default=str)
# ── HTML report ───────────────────────────────────────────────────────────
tmpl = Template(HTML_TEMPLATE)
html = tmpl.render(
meta=meta,
generated_at=datetime.now().strftime('%Y-%m-%d %H:%M:%S UTC'),
risk_label=risk_label,
risk_score=risk_score,
findings=findings_dicts,
iocs=iocs,
network=plugin_results.get('windows.netscan',
plugin_results.get('netscan',
plugin_results.get('linux.netstat', []))),
processes=plugin_results.get('windows.pslist',
plugin_results.get('pslist',
plugin_results.get('linux.pslist', []))),
)
html_path = report_dir / 'report.html'
with open(html_path, 'w') as f:
f.write(html)
log.info(f"Reports saved to {report_dir}")
return {
'json': str(json_path),
'html': str(html_path),
'risk_label': risk_label,
'risk_score': risk_score,
'finding_count': len(findings),
'report_dir': str(report_dir),
}
The main entry point: analyse.py
#!/usr/bin/env python3
## /opt/memory-hunter/analyse.py
## THE ONE SCRIPT TO RUN
## Usage: python3 analyse.py /path/to/memory.raw
## Usage: python3 analyse.py /path/to/memory.raw --workers 16 --no-vol2
import sys
import argparse
import logging
import time
import json
from datetime import datetime
from pathlib import Path
# Add scripts directory to path
sys.path.insert(0, str(Path(__file__).parent / 'scripts'))
from phase1_identify import identify_image
from vol_runner import collect_all_plugins
from anomaly_checks import run_all_checks
from phase4_iocs import extract_iocs
from phase5_report import generate_reports
def setup_logging(log_file: str = None) -> None:
handlers = [logging.StreamHandler(sys.stdout)]
if log_file:
handlers.append(logging.FileHandler(log_file))
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=handlers
)
def print_banner():
print("""
╔══════════════════════════════════════════════════════╗
║ Memory Hunter - Automated Analysis ║
║ Volatility 2 + 3 | Windows + Linux | HTML + JSON ║
╚══════════════════════════════════════════════════════╝
""")
def print_summary(report_info: dict, elapsed: float):
risk = report_info['risk_label']
colour = {
'CRITICAL': '\033[91m', 'HIGH': '\033[93m',
'MEDIUM': '\033[33m', 'LOW': '\033[92m'
}.get(risk, '')
reset = '\033[0m'
print(f"""
{'='*60}
ANALYSIS COMPLETE ({elapsed:.1f} seconds)
{'='*60}
Risk Level: {colour}{risk}{reset}
Risk Score: {report_info['risk_score']}
Findings: {report_info['finding_count']}
HTML Report: {report_info['html']}
JSON Report: {report_info['json']}
{'='*60}
""")
def main():
print_banner()
parser = argparse.ArgumentParser(
description='Automated memory image analysis for threat hunters'
)
parser.add_argument('image', help='Path to memory image file')
parser.add_argument('--workers', type=int, default=8,
help='Parallel plugin workers (default: 8)')
parser.add_argument('--no-vol2', action='store_true',
help='Skip Volatility 2 plugins')
parser.add_argument('--output-dir', default='/opt/memory-hunter/reports',
help='Report output directory')
parser.add_argument('--yara-rules', default='/opt/memory-hunter/yara_rules/combined.yar',
help='Path to compiled Yara rules')
parser.add_argument('--log-file', help='Write log to file')
parser.add_argument('--quiet', action='store_true',
help='Reduce output verbosity')
args = parser.parse_args()
setup_logging(args.log_file)
log = logging.getLogger(__name__)
start_time = time.time()
image_path = str(Path(args.image).absolute())
print(f"[*] Image: {image_path}")
print(f"[*] Workers: {args.workers}")
print()
# ── Phase 1: Identify ────────────────────────────────────────────────────
print("[Phase 1/5] Identifying image...")
meta = identify_image(image_path)
print(f" OS Type: {meta.get('os_type', 'unknown')}")
print(f" OS Version: {meta.get('os_version', 'unknown')}")
print(f" Size: {meta.get('size_gb', 0):.1f} GB")
print(f" SHA256: {meta.get('sha256', '')[:32]}...")
print()
if not meta.get('os_type'):
print("[!] Could not identify OS type - check symbol tables")
print(" Windows: ensure /opt/vol3-symbols/windows/ is populated")
print(" Linux: ensure ISF file exists for this kernel version")
sys.exit(1)
# ── Phase 2: Collect ─────────────────────────────────────────────────────
print(f"[Phase 2/5] Running plugin collection ({args.workers} workers)...")
plugin_results = collect_all_plugins(image_path, meta, args.workers)
print(f" Plugins completed: {len(plugin_results)}")
print()
# ── Phase 3: Detect ──────────────────────────────────────────────────────
print("[Phase 3/5] Running anomaly detection...")
findings, risk_score = run_all_checks(plugin_results)
critical = sum(1 for f in findings if f.severity == 'CRITICAL')
high = sum(1 for f in findings if f.severity == 'HIGH')
print(f" Findings: {len(findings)} total ({critical} critical, {high} high)")
if findings:
print(" Top findings:")
for f in findings[:5]:
print(f" [{f.severity}] {f.title}")
print()
# ── Phase 4: IOCs ────────────────────────────────────────────────────────
print("[Phase 4/5] Extracting IOCs...")
suspicious_pids = [f.pid for f in findings if f.pid]
iocs = extract_iocs(plugin_results, image_path, suspicious_pids)
ioc_count = sum(len(v) for v in iocs.values())
print(f" IOCs extracted: {ioc_count}")
if iocs.get('yara_hits'):
print(f" YARA hits: {len(iocs['yara_hits'])}")
for hit in iocs['yara_hits'][:3]:
print(f" -> {hit}")
print()
# ── Phase 5: Report ──────────────────────────────────────────────────────
print("[Phase 5/5] Generating reports...")
report_info = generate_reports(meta, findings, iocs, plugin_results)
elapsed = time.time() - start_time
print_summary(report_info, elapsed)
# Exit code reflects risk level for CI/CD integration
exit_codes = {'CRITICAL': 3, 'HIGH': 2, 'MEDIUM': 1, 'LOW': 0}
sys.exit(exit_codes.get(report_info['risk_label'], 0))
if __name__ == '__main__':
main()
Making it truly one command to run
## Install the script system-wide
sudo ln -sf /opt/memory-hunter/analyse.py /usr/local/bin/memory-hunt
sudo chmod +x /opt/memory-hunter/analyse.py
## Now you can run from anywhere:
memory-hunt /path/to/image.raw
## With options:
memory-hunt /path/to/image.raw --workers 16 --log-file /tmp/analysis.log
## In a pipeline (exit code reflects risk):
memory-hunt suspicious.raw && echo "CLEAN" || echo "THREATS FOUND"
## Process multiple images in parallel
ls /srv/memory/landing/*.raw | \
parallel -j 4 memory-hunt {} --log-file /srv/memory/logs/{/.}.log
## Quick check with reduced scope (faster for initial triage)
memory-hunt image.raw --workers 4 --no-vol2
Linux image support and ISF generation
## Linux memory analysis requires ISF (Intermediate Symbol Format) files
## These must be generated for each specific kernel version being analysed
## Method 1: Generate ISF from a running system (same kernel as the image)
## Install dwarf2json
wget https://github.com/volatilityfoundation/dwarf2json/releases/latest/download/dwarf2json-linux-amd64
chmod +x dwarf2json-linux-amd64
## Generate ISF from the running kernel (on the target or identical system)
sudo ./dwarf2json-linux-amd64 linux \
--elf /usr/lib/debug/boot/vmlinux-$(uname -r) \
> /opt/vol3-symbols/linux/$(uname -r).json
## Method 2: Generate from vmlinux debug symbols package
## On Ubuntu/Debian:
sudo apt install linux-image-$(uname -r)-dbgsym 2>/dev/null || \
sudo apt install linux-image-$(uname -r)-dbg
sudo ./dwarf2json-linux-amd64 linux \
--elf /usr/lib/debug/boot/vmlinux-$(uname -r) \
--system-map /boot/System.map-$(uname -r) \
> /opt/vol3-symbols/linux/$(uname -r).json
## Verify Volatility can use the ISF
vol -f linux_memory.lime linux.pslist
## Automating ISF generation for a fleet
## Run this on each unique kernel version in your environment
python3 << 'EOF'
import subprocess, os, sys
from pathlib import Path
ISF_DIR = Path('/opt/vol3-symbols/linux')
ISF_DIR.mkdir(parents=True, exist_ok=True)
kernel_version = subprocess.run(['uname', '-r'],
capture_output=True, text=True).stdout.strip()
isf_path = ISF_DIR / f"{kernel_version}.json"
if isf_path.exists():
print(f"ISF already exists for {kernel_version}")
sys.exit(0)
# Try to find vmlinux debug symbols
vmlinux_paths = [
f'/usr/lib/debug/boot/vmlinux-{kernel_version}',
f'/usr/lib/debug/lib/modules/{kernel_version}/vmlinux',
f'/boot/vmlinux-{kernel_version}',
]
vmlinux = next((p for p in vmlinux_paths if os.path.exists(p)), None)
if not vmlinux:
print(f"No debug symbols found for {kernel_version}")
print(f"Install: apt install linux-image-{kernel_version}-dbgsym")
sys.exit(1)
result = subprocess.run([
'/opt/dwarf2json-linux-amd64', 'linux',
'--elf', vmlinux,
], capture_output=True, timeout=300)
if result.returncode == 0:
isf_path.write_bytes(result.stdout)
print(f"ISF generated: {isf_path} ({len(result.stdout)//1024}KB)")
else:
print(f"dwarf2json failed: {result.stderr}")
sys.exit(1)
EOF
Troubleshooting the automation pipeline
## Common issues and their fixes
## Issue 1: "Unsatisfied requirement" errors from Volatility 3
## This means symbol tables are missing or mislinked
source /opt/vol3-env/bin/activate
python3 -c "
import volatility3.symbols as sym
import os
sym_path = os.path.dirname(sym.__file__)
win_path = os.path.join(sym_path, 'windows')
print(f'Symbol path: {sym_path}')
print(f'Windows syms: {os.path.isdir(win_path)}')
if os.path.isdir(win_path):
files = os.listdir(win_path)
print(f'Files: {len(files)} (sample: {files[:2]})')
"
## Fix: re-link symbols
SITE=$(python3 -c "import site; print(site.getsitepackages()[0])")
ln -sf /opt/vol3-symbols/windows $SITE/volatility3/symbols/windows
## Issue 2: Plugin times out on large images
## Increase timeout values in vol_runner.py or limit to core plugins only
## The --workers flag does not help here - timeouts are per-plugin
## Issue 3: Yara scan returns no results despite known malware in image
## Check compiled ruleset is not corrupted
python3 -c "
import yara
try:
rules = yara.load('/opt/memory-hunter/yara_rules/combined.yar')
print('Ruleset loaded OK')
except Exception as e:
print(f'Error: {e} - recompile the ruleset')
"
## Recompile from source rules:
python3 -c "
import yara, glob
rule_files = {}
for f in glob.glob('/opt/memory-hunter/yara_rules/rules/*.yar'):
rule_files[f.split('/')[-1].replace('.yar','')] = f
combined = yara.compile(filepaths=rule_files)
combined.save('/opt/memory-hunter/yara_rules/combined.yar')
print(f'Compiled {len(rule_files)} rule files')
"
## Issue 4: HTML report renders but shows no data
## The template assumes Vol3 list format [pid, ppid, name, ...path at index 10]
## Different Vol3 versions may change column ordering
## Debug by checking raw plugin output:
vol -f image.raw --renderer json windows.pslist | \
python3 -c "import json,sys; d=json.load(sys.stdin); print(d['columns'])"
## Issue 5: analyse.py runs but risk score is always 0
## Check anomaly_checks.py is receiving data by printing plugin result sizes:
python3 -c "
import sys; sys.path.insert(0,'/opt/memory-hunter/scripts')
from vol_runner import collect_all_plugins
from phase1_identify import identify_image
meta = identify_image(sys.argv[1])
results = collect_all_plugins(sys.argv[1], meta, 4)
for name, data in sorted(results.items()):
count = len(data) if isinstance(data, list) else len(data.splitlines())
print(f'{name}: {count} records')
" /path/to/image.raw
Integrating with the fleet pipeline
The single-image automation script integrates cleanly with the fleet collection pipeline described in the companion post. When the image watcher detects a new image it can call the analyse.py script directly instead of the Celery task chain, which is useful for simpler deployments that do not need the full distributed pipeline.
## Simple integration with the image watcher
## Replace the validate_image.delay() call with a direct script invocation
## In image_watcher.py, replace:
## validate_image.delay(image_id)
## With:
import subprocess
subprocess.Popen([
'/opt/vol3-env/bin/python3',
'/opt/memory-hunter/analyse.py',
str(path),
'--workers', '8',
'--log-file', f'/srv/memory/logs/{hostname}_{timestamp}.log',
'--output-dir', f'/srv/memory/reports/{hostname}',
])
## The exit code from analyse.py maps to risk level:
## 0 = LOW (no concerning findings)
## 1 = MEDIUM
## 2 = HIGH
## 3 = CRITICAL
## Use this in automation to trigger different response actions
## Example: auto-isolate a host if CRITICAL findings
result = subprocess.run([
'/opt/vol3-env/bin/python3',
'/opt/memory-hunter/analyse.py',
image_path,
], capture_output=True)
if result.returncode == 3:
log.warning(f"CRITICAL findings in {hostname} - triggering isolation workflow")
# Call your EDR/firewall API to isolate the host
isolate_host(hostname)
elif result.returncode == 2:
log.warning(f"HIGH findings in {hostname} - notifying SOC")
notify_soc(hostname)
The HTML report template in full
The report template referenced in phase5_report.py is a Jinja2 template that lives at /opt/memory-hunter/templates/report.html.j2. The template code was included inline in the phase5_report.py listing above as the HTML_TEMPLATE string. To use it as a standalone file instead, replace the inline string with a file load:
## In phase5_report.py, replace the HTML_TEMPLATE string with:
from jinja2 import Environment, FileSystemLoader
env = Environment(loader=FileSystemLoader('/opt/memory-hunter/templates'))
tmpl = env.get_template('report.html.j2')
html = tmpl.render(...)
Save the template content from the HTML_TEMPLATE variable in phase5_report.py to /opt/memory-hunter/templates/report.html.j2. The template uses standard Jinja2 syntax throughout: {{ variable }} for output, {% for item in list %} for loops, {% if condition %} for conditionals. No additional template dependencies are needed beyond Jinja2 itself.
Yara rules: complete content for all four rule files
These are the four rule files that belong in /opt/memory-hunter/yara_rules/rules/. Each focuses on a different threat category relevant to Windows endpoint memory analysis.
## File: /opt/memory-hunter/yara_rules/rules/cobalt_strike.yar
## Detects Cobalt Strike beacon variants in process memory
rule CobaltStrike_Beacon_Config_Decoded {
meta:
description = "Detects decoded Cobalt Strike beacon configuration in process memory"
author = "justruss"
date = "2026-05-24"
confidence = "high"
reference = "https://blog.cobaltstrike.com/2021/02/09/learn-pipe-fitting-for-all-of-your-offense-projects/"
strings:
// Beacon config block header pattern (appears in decoded config)
$cfg_header = { 00 01 00 01 00 00 00 ?? 00 02 00 01 }
// Default HTTP GET URI patterns in decoded beacon memory
$uri_check = "/updates/check" ascii wide
$uri_submit = "/submit.php" ascii wide
$uri_cdn = "/CDN/" ascii wide
// Sleep mask instruction sequence common across CS versions
$sleep_mask = { C7 44 24 ?? 01 00 00 00 EB ?? }
// Reflective loader export (present in memory even without file on disk)
$ref_loader = "ReflectiveLoader" ascii fullword
// Named pipe patterns for SMB beacon
$pipe_msse = "\\.\pipe\MSSE-" wide ascii
$pipe_postex = "\\.\pipe\postex_" wide ascii
$pipe_status = "\\.\pipe\status_" wide ascii
// Watermark field offset in beacon config
$watermark = { 00 27 00 01 }
condition:
// PE in memory or raw shellcode context
(($ref_loader or $sleep_mask) and 1 of ($uri_check, $uri_submit, $uri_cdn, $cfg_header))
or 2 of ($pipe_msse, $pipe_postex, $pipe_status)
}
rule CobaltStrike_Shellcode_Stager {
meta:
description = "Detects Cobalt Strike shellcode stager in memory"
confidence = "medium"
strings:
// Common x64 CS stager prologue
$stager_x64 = { FC 48 83 E4 F0 E8 C? 00 00 00 }
// Common x86 CS stager
$stager_x86 = { FC E8 8? 00 00 00 60 89 E5 }
// DNS stager pattern
$dns_stager = { 64 A1 30 00 00 00 8B 40 0C 8B 40 1C }
condition:
any of them
}
rule CobaltStrike_MalleableC2_Indicators {
meta:
description = "Detects indicators of Cobalt Strike Malleable C2 profiles"
confidence = "low"
strings:
// Common Malleable C2 Amazon profile indicators
$amz_host = "s3.amazonaws.com" ascii wide
$amz_sdk = "aws-sdk-go" ascii wide
// Common Malleable C2 Office 365 profile
$o365 = "outlook.office365.com" ascii wide
// Common fake user agents in profiles
$ua_excel = "Microsoft Excel" ascii wide
$ua_teams = "Teams/1." ascii wide
condition:
// Only meaningful in context of other CS indicators
($ref_loader or $sleep_mask) and
(1 of ($amz_host, $amz_sdk, $o365, $ua_excel, $ua_teams))
}
## File: /opt/memory-hunter/yara_rules/rules/meterpreter.yar
## Detects Meterpreter variants in process memory
rule Meterpreter_Reflective_DLL_x64 {
meta:
description = "Detects Meterpreter x64 reflective DLL loaded in process memory"
author = "justruss"
date = "2026-05-24"
confidence = "high"
strings:
$mz = { 4D 5A }
$ref_loader = "ReflectiveLoader" ascii fullword
// Meterpreter core extension names
$stdapi = "stdapi_" ascii
$priv = "priv_elevate" ascii
$incognito = "incognito_" ascii
$kiwi = "kiwi_cmd" ascii
// Meterpreter transport strings
$transport = "METERPRETER_TRANSPORT_" ascii
$pivot = "pivot_" ascii
// Session GUID pattern in Meterpreter memory
$session_guid = { [0-9A-F]{8}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{4}-[0-9A-F]{12} }
condition:
$mz at 0
and $ref_loader
and 2 of ($stdapi, $priv, $incognito, $kiwi, $transport, $pivot)
}
rule Meterpreter_Shellcode_Reverse_TCP {
meta:
description = "Detects Meterpreter reverse TCP shellcode in memory"
confidence = "high"
strings:
// Common Meterpreter reverse_tcp x64 shellcode pattern
$rev_tcp_x64 = { 49 BE ?? ?? ?? ?? ?? ?? ?? ?? 41 FF E6 }
// Reverse HTTPS connect sequence
$rev_https = { 68 ?? ?? ?? ?? 68 02 00 }
// LoadLibrary + GetProcAddress resolution pattern
$lib_resolve = { 48 31 C9 48 81 EC D0 00 00 00 }
condition:
any of them
}
rule Meterpreter_Python_Stage {
meta:
description = "Detects Python Meterpreter stage in memory"
confidence = "medium"
strings:
$py_met1 = "met_api" ascii
$py_met2 = "meterpreter.core" ascii
$py_met3 = "MeterpreterSession" ascii
$py_met4 = "from metasploit" ascii nocase
condition:
2 of them
}
## File: /opt/memory-hunter/yara_rules/rules/credential_tools.yar
## Detects credential access tools in process memory
rule Mimikatz_In_Memory {
meta:
description = "Detects Mimikatz and variants loaded in process memory"
author = "justruss"
date = "2026-05-24"
confidence = "high"
strings:
// Core Mimikatz module names
$sekurlsa = "sekurlsa::" ascii wide nocase
$lsadump = "lsadump::" ascii wide nocase
$kerberos = "kerberos::" ascii wide nocase
$crypto = "crypto::" ascii wide nocase
$dpapi = "dpapi::" ascii wide nocase
// Mimikatz output strings
$mimikatz_id = "mimikatz" ascii wide nocase
$priv_debug = "privilege::debug" ascii wide nocase
$logonpw = "logonPasswords" ascii wide
// WDigest provider targeting
$wdigest = "wdigest.dll" ascii wide
$lsasrv = "lsasrv.dll" ascii wide
// Common obfuscated variants still contain these
$ntlm_hash = "NTLM hash" ascii wide nocase
$aes256_key = "AES256 HMAC" ascii wide
condition:
2 of ($sekurlsa, $lsadump, $kerberos, $crypto, $dpapi, $mimikatz_id, $priv_debug)
or ($wdigest and $lsasrv and 1 of ($sekurlsa, $lsadump, $logonpw))
or ($ntlm_hash and $aes256_key and $wdigest)
}
rule Rubeus_Kerberos_Toolkit {
meta:
description = "Detects Rubeus .NET Kerberos attack toolkit in memory"
confidence = "high"
strings:
$rubeus_id = "Rubeus" ascii wide
$asktgt = "asktgt" ascii wide nocase
$kerberoast = "kerberoast" ascii wide nocase
$asreproast = "asreproast" ascii wide nocase
$s4u = " s4u " ascii wide nocase
$ptt = "ptt" ascii wide
$harvest = "harvest" ascii wide nocase
$monitor = "monitor" ascii wide nocase
$dump = "dump" ascii wide
// .NET assembly marker
$dotnet = { 4D 5A 90 00 03 00 00 00 04 00 00 00 FF FF 00 00 }
condition:
$rubeus_id and 3 of ($asktgt, $kerberoast, $asreproast, $s4u,
$ptt, $harvest, $monitor, $dump)
}
rule SharpHound_BloodHound_Collector {
meta:
description = "Detects SharpHound/BloodHound AD enumeration tool in memory"
confidence = "high"
strings:
$sh1 = "SharpHound" ascii wide nocase
$sh2 = "BloodHound" ascii wide nocase
$sh3 = "Invoke-BloodHound" ascii wide nocase
$ldap1 = "GetAllDomainTrusts" ascii wide
$ldap2 = "GetDomainControllers" ascii wide
$ldap3 = "GetDomainComputers" ascii wide
$ldap4 = "LdapSearcher" ascii wide
$zip = "BloodHound.zip" ascii wide
$json1 = "computers.json" ascii wide
$json2 = "users.json" ascii wide
$json3 = "groups.json" ascii wide
condition:
1 of ($sh1, $sh2, $sh3)
or (3 of ($ldap1, $ldap2, $ldap3, $ldap4, $zip, $json1, $json2, $json3))
}
rule NanoDump_LSASS_Dumper {
meta:
description = "Detects NanoDump or similar LSASS dumping tools in memory"
confidence = "high"
strings:
$nano1 = "nanodump" ascii wide nocase
$nano2 = "NanoDump" ascii wide
// Direct syscall patterns used by NanoDump
$syscall_pattern = { 4C 8B D1 B8 ?? 00 00 00 0F 05 C3 }
// MiniDump callback function name
$minidump = "MiniDumpWriteDump" ascii wide
// LSASS targeting strings common to multiple dumpers
$lsass_name = "lsass.exe" ascii wide nocase
$lsass_pid = "lsass" ascii wide nocase
condition:
$nano1 or $nano2
or ($syscall_pattern and $lsass_name)
or ($minidump and $lsass_pid and $syscall_pattern)
}
rule Seatbelt_Recon_Tool {
meta:
description = "Detects Seatbelt post-exploitation recon tool in memory"
confidence = "high"
strings:
$sb1 = "Seatbelt" ascii wide
$sb2 = "WindowsCredentialFiles" ascii wide
$sb3 = "DpapiMasterKeys" ascii wide
$sb4 = "RDPSavedConnections" ascii wide
$sb5 = "NetworkProfiles" ascii wide
$sb6 = "TokenPrivileges" ascii wide
$sb7 = "PowerShellHistory" ascii wide
condition:
$sb1 or 4 of ($sb2, $sb3, $sb4, $sb5, $sb6, $sb7)
}
## File: /opt/memory-hunter/yara_rules/rules/generic_suspicious.yar
## Generic patterns for suspicious code patterns regardless of tool
rule RWX_PE_In_Anonymous_Memory {
meta:
description = "PE file in executable anonymous memory - possible reflective loading"
author = "justruss"
date = "2026-05-24"
confidence = "medium"
note = "May fire on legitimate .NET JIT or browser JIT - tune per environment"
strings:
$mz_header = { 4D 5A 90 00 }
$pe_sig = { 50 45 00 00 }
condition:
$mz_header at 0 and $pe_sig
}
rule Shellcode_Common_x64_Preambles {
meta:
description = "Common x64 shellcode entry patterns in executable memory"
confidence = "medium"
strings:
// Stack alignment + call setup common in x64 shellcode
$preamble1 = { FC 48 83 E4 F0 E8 }
// GetPC (get program counter) techniques
$getpc1 = { E8 00 00 00 00 59 }
$getpc2 = { E8 00 00 00 00 5B }
// PEB walking to find kernel32
$peb_walk = { 64 48 8B 04 25 60 00 00 00 }
// NOP sled into shellcode
$nop_sled = { 90 90 90 90 90 90 90 90 FC 48 }
condition:
any of them
}
rule Suspicious_Named_Pipe {
meta:
description = "Named pipe patterns associated with common C2 frameworks"
confidence = "high"
strings:
// Cobalt Strike defaults
$cs_msse = "\\.\pipe\MSSE-" ascii wide
$cs_postex = "\\.\pipe\postex_" ascii wide
$cs_msagent = "\\.\pipe\msagent_" ascii wide
$cs_status = "\\.\pipe\status_" ascii wide
// Metasploit defaults
$msf_pipe = "\\.\pipe\metsrv" ascii wide
// Empire defaults
$empire_pipe = "\\.\pipe\empire" ascii wide nocase
// Generic random-looking pipe names (4+ hex chars)
$hex_pipe = /\\\.\\pipe\\[0-9a-f]{8,}/ ascii wide
condition:
any of ($cs_msse, $cs_postex, $cs_msagent, $cs_status,
$msf_pipe, $empire_pipe)
or $hex_pipe
}
rule AMSI_Bypass_Patterns {
meta:
description = "Detects common AMSI bypass technique byte patterns in memory"
confidence = "high"
strings:
// AmsiScanBuffer patch (ret instruction at function entry)
$amsi_patch1 = { B8 57 00 07 80 C3 }
// Common AmsiScanBuffer null return patch
$amsi_patch2 = { 31 C0 C3 }
// AmsiInitialize hook
$amsi_str = "amsi.dll" ascii wide nocase
$amsi_func = "AmsiScanBuffer" ascii wide
$amsi_init = "AmsiInitialize" ascii wide
condition:
($amsi_patch1 or $amsi_patch2) and ($amsi_str or $amsi_func)
}
rule ETW_Tamper_Patterns {
meta:
description = "Detects ETW patching techniques in process memory"
confidence = "high"
strings:
// EtwEventWrite ret patch (most common ETW bypass)
$etw_patch = { C2 14 00 }
// ntdll EtwEventWrite function beginning before patch
$etw_func = "EtwEventWrite" ascii wide
// Common ETW provider disable strings
$etw_disable = "EtwEventUnregister" ascii wide
condition:
$etw_patch and ($etw_func or $etw_disable)
}
rule PowerShell_Encoded_Payload_Decoded {
meta:
description = "Detects decoded PowerShell payloads in process memory"
confidence = "medium"
strings:
// Download cradle variants after decoding
$dl_string = "DownloadString" ascii wide nocase
$dl_file = "DownloadFile" ascii wide nocase
$dl_data = "DownloadData" ascii wide nocase
$webclient = "Net.WebClient" ascii wide nocase
$iex = "IEX" ascii wide
$invoke_exp = "Invoke-Expression" ascii wide nocase
// Reflection loading
$ref_load = "[Reflection.Assembly]::Load" ascii wide
$ref_load2 = "Assembly::LoadWithPartialName" ascii wide
// Credential theft
$get_cred = "Get-Credential" ascii wide
$sec_string = "ConvertTo-SecureString" ascii wide
$marshal = "SecureStringToGlobalAllocUnicode" ascii wide
condition:
2 of ($dl_string, $dl_file, $dl_data, $webclient, $iex, $invoke_exp,
$ref_load, $ref_load2)
or ($get_cred and $sec_string and $marshal)
}
rule Possible_Beacon_Sleep_Obfuscation {
meta:
description = "Detects sleep obfuscation techniques used by beacons to evade memory scanners"
confidence = "medium"
note = "Beacons encrypt themselves during sleep to evade memory scanning"
strings:
// Ekko sleep obfuscation ROP chain marker
$ekko1 = { 48 89 5C 24 08 48 89 74 24 10 57 48 83 EC 20 }
// Foliage sleep obfuscation pattern
$foliage = { 48 8D 05 ?? ?? ?? ?? 48 89 44 24 ?? 48 8D 0D }
// Sleepmask XOR key schedule pattern
$sleepmask = { 48 31 C0 48 FF C0 48 3D ?? 00 00 00 }
condition:
any of them
}
Compiling the combined Yara ruleset
After placing all four files in the rules directory, compile them into a single binary ruleset that Volatility’s vadyarascan plugin can load. The compiled format is faster to load than re-parsing individual text files on every scan.
## Compile all rules into combined.yar
source /opt/vol3-env/bin/activate
python3 < {OUTPUT}")
print(f"Ruleset size: {Path(OUTPUT).stat().st_size / 1024:.1f} KB")
EOF
## Verify the compiled ruleset loads correctly
python3 -c "
import yara
rules = yara.load('/opt/memory-hunter/yara_rules/combined.yar')
print('Ruleset loaded OK')
print(f'Rules available for memory scanning')
"
## Test against a known-clean binary to check false positive rate
python3 -c "
import yara
rules = yara.load('/opt/memory-hunter/yara_rules/combined.yar')
import os
test_files = ['/bin/ls', '/bin/cat', '/usr/bin/python3']
for f in test_files:
if os.path.exists(f):
matches = rules.match(f)
if matches:
print(f'FP WARNING: {f} matched {[m.rule for m in matches]}')
else:
print(f'Clean: {f}')
"
Adding your own Yara rules
The four files above are a starting point covering the most commonly encountered C2 frameworks and credential theft tools. As you encounter new malware families or build rules from your own analysis, add new .yar files to the rules directory and recompile. A few practical notes on writing rules that work well in a memory scanning context.
Avoid filesize conditions entirely since Yara scanning process memory regions does not have a meaningful file size. Avoid conditions that depend on PE structure offsets like pe.entry_point unless you are certain the region you are scanning is a complete PE and not a fragment or raw shellcode. String conditions that use fullword are more reliable in memory than substring matches because memory contains a lot of incidental short strings. For strings that appear in both legitimate software and malware (like “cmd.exe” or “powershell”), always pair them with at least one other more unique indicator before the rule can match. A condition that requires three or more strings is almost always more reliable than one requiring a single string, even if the single string seems highly distinctive on the initial sample.
The clean_samples directory at /opt/memory-hunter/yara_rules/clean_samples/ is intended for known-clean Windows executables that you run new rules against before deploying them. A useful set to maintain there: a clean copy of common system binaries (ntdll.dll, kernel32.dll, powershell.exe), clean copies of legitimate admin tools (PsExec, Process Explorer), and a small set of benign .NET assemblies. Running every new rule against these before adding it to the compiled ruleset catches false positives before they generate noise in production scans.
The complete repository structure
## Clone-ready repository layout
## All scripts from this post organised for immediate use
/opt/memory-hunter/
├── analyse.py # Main entry point - run this
├── setup_volatility_dual.sh # One-shot environment setup
├── requirements.txt # Python dependencies
├── README.md # Quick start guide
│
├── scripts/
│ ├── phase1_identify.py # OS detection and image metadata
│ ├── vol_runner.py # Vol2/Vol3 abstraction + parallel collection
│ ├── anomaly_checks.py # Detection logic and Finding class
│ ├── phase4_iocs.py # IOC extraction and Yara scanning
│ └── phase5_report.py # HTML and JSON report generation
│
├── templates/
│ └── report.html.j2 # Jinja2 HTML report template
│
├── yara_rules/
│ ├── rules/ # Individual .yar files (add yours here)
│ │ ├── cobalt_strike.yar
│ │ ├── meterpreter.yar
│ │ ├── credential_tools.yar
│ │ └── generic_suspicious.yar
│ ├── combined.yar # Compiled ruleset (auto-generated)
│ └── clean_samples/ # Known-clean files for FP testing
│
├── reports/ # Analysis output (gitignored)
└── logs/ # Run logs (gitignored)
## requirements.txt
volatility3
yara-python
pefile
capstone
python-magic
requests
jinja2
tqdm
tabulate
colorama
## Quick start
git clone https://github.com/yourrepo/memory-hunter /opt/memory-hunter
cd /opt/memory-hunter
bash setup_volatility_dual.sh
ln -sf /opt/memory-hunter/analyse.py /usr/local/bin/memory-hunt
## Compile Yara rules
python3 -c "
import yara, glob
rule_files = {f.split('/')[-1].replace('.yar',''): f
for f in glob.glob('/opt/memory-hunter/yara_rules/rules/*.yar')}
yara.compile(filepaths=rule_files).save('/opt/memory-hunter/yara_rules/combined.yar')
print(f'Compiled {len(rule_files)} rule files')
"
## Run your first analysis
memory-hunt /path/to/memory.raw
The pipeline produces two output files per image: a structured JSON file suitable for SIEM ingestion, scripted comparison across multiple images, or feeding into a correlation pipeline, and an HTML report that opens in any browser with colour-coded severity, IOC tables, full process list, and network connections. The HTML report is designed to be shared with stakeholders who need to understand the findings without running any tools themselves.
The most important design decision in the whole pipeline is the exit code contract. By mapping risk levels to exit codes (0 for clean, 3 for critical), the script integrates cleanly into any automation that knows how to act on a process exit code. Shell scripts, CI/CD pipelines, the image watcher daemon, and orchestration frameworks all speak exit codes natively. A threat hunter who wants to process fifty images and immediately see which ones need attention can run a single parallel command and act on the results without reading any output until something non-zero comes back.