Etairos.
⚡ InvestCloud Security Lakehouse

AI Layer

Natural language queries, Jira/Zendesk → auto-generated Grafana dashboards, and scheduled anomaly detection - all via AWS Bedrock.

AI Layer: Auto-Dashboards, NL Queries, Jira/Zendesk Integration

Author: RedEye Security | Date: 2026-04-06 | Status: Draft v1.0


Overview

Three AI capabilities: 1. NL Query - “Show me auth failures from EKS last 24h” → Athena SQL → results 2. Dashboard Generator - Jira/Zendesk ticket → Grafana dashboard → auto-deploy PR 3. Anomaly Detection - scheduled AI analysis → Jira ticket if anomaly found

All run on EKS. LLM options: AWS Bedrock (managed) or Ollama on GPU node (self-hosted).


1. Natural Language Query API

User (Grafana panel, Slack, CLI)
      │  POST /query {"question": "auth failures from EKS last 24h"}
      ▼
Query API (FastAPI, EKS)
      │
      ├── 1. Retrieve schema context (Glue catalog → table + column names)
      ├── 2. Build prompt: schema + question → LLM → SQL
      ├── 3. Validate SQL (parse, check table/column names)
      ├── 4. Execute via Athena
      ├── 5. Summarize result → LLM → plain English
      └── 6. Return: {sql, rows, summary, athena_execution_id}

FastAPI service

# query-api/app.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import boto3, json, time

app = FastAPI()
athena = boto3.client('athena')
bedrock = boto3.client('bedrock-runtime')

SCHEMA_CONTEXT = """
Tables in database: security_lakehouse

network_activity: time(bigint), dt(date), src_endpoint(struct<ip:string,hostname:string>),
  dst_endpoint(struct<ip:string,hostname:string,port:int>), severity_id(int),
  activity_id(int), sourcetype(string), region(string), raw_data(string)

authentication: time(bigint), dt(date), user(struct<name:string,uid:string>),
  src_endpoint(struct<ip:string>), dst_endpoint(struct<hostname:string>),
  status_id(int), status(string), auth_protocol(string), sourcetype(string)

security_finding: time(bigint), dt(date), finding(struct<title:string,desc:string,severity:string>),
  resources(array<struct<type:string,name:string>>), sourcetype(string)
"""

TEXT2SQL_PROMPT = """You are a SQL expert for AWS Athena (Presto SQL dialect).
Convert the question to valid Athena SQL. Rules:
- Always include: WHERE dt >= current_date - interval '7' day (unless user specifies range)
- Use LIMIT 1000 unless asking for counts/aggregates
- For struct fields use dot notation: src_endpoint.ip
- Return ONLY the SQL, no explanation, no markdown

Schema:
{schema}

Question: {question}
SQL:"""

class QueryRequest(BaseModel):
    question: str
    max_rows: int = 100

@app.post("/query")
async def query(req: QueryRequest):
    # Step 1: Text to SQL
    sql = llm_call(TEXT2SQL_PROMPT.format(schema=SCHEMA_CONTEXT, question=req.question))
    sql = sql.strip().rstrip(';')
    
    # Step 2: Execute in Athena
    execution = athena.start_query_execution(
        QueryString=sql,
        WorkGroup='security-lakehouse',
        ResultConfiguration={'OutputLocation': 's3://ic-security-log-archive/athena-results/'}
    )
    exec_id = execution['QueryExecutionId']
    
    # Step 3: Poll for completion
    for _ in range(30):
        status = athena.get_query_execution(QueryExecutionId=exec_id)
        state = status['QueryExecution']['Status']['State']
        if state == 'SUCCEEDED': break
        if state in ('FAILED', 'CANCELLED'):
            raise HTTPException(500, f"Athena query {state}: {status['QueryExecution']['Status'].get('StateChangeReason')}")
        time.sleep(2)
    
    # Step 4: Fetch results
    results = athena.get_query_results(QueryExecutionId=exec_id, MaxResults=req.max_rows)
    rows = parse_athena_results(results)
    
    # Step 5: Summarize
    summary = llm_call(f"Summarize these security findings in 2-3 sentences:\n{json.dumps(rows[:20])}")
    
    return {"sql": sql, "rows": rows, "summary": summary, "execution_id": exec_id}

def llm_call(prompt: str) -> str:
    response = bedrock.invoke_model(
        modelId='anthropic.claude-3-haiku-20240307-v1:0',  # cheap + fast
        body=json.dumps({"messages": [{"role": "user", "content": prompt}], "max_tokens": 1024, "anthropic_version": "bedrock-2023-05-31"})
    )
    return json.loads(response['body'].read())['content'][0]['text']

2. Dashboard Generator (Jira/Zendesk → Grafana)

Flow

Jira ticket created with label: "dashboard-request"
      │
      │  Jira webhook → POST /generate-dashboard
      ▼
Dashboard Generator API (FastAPI, EKS)
      │
      ├── 1. Parse ticket: title, description, acceptance criteria
      ├── 2. Identify required data: extract sourcetypes/metrics mentioned
      ├── 3. Query Glue catalog: get relevant table schemas
      ├── 4. LLM → generate Grafana dashboard JSON
      ├── 5. Validate JSON (grafana-dash-linter)
      ├── 6. Git: create branch, commit dashboard JSON
      ├── 7. GitLab: open MR with ticket link
      └── 8. Comment on Jira ticket: "Dashboard PR ready: <link>"

Dashboard Generator

# dashboard-gen/app.py
DASHBOARD_PROMPT = """You are a Grafana dashboard expert. Generate a complete Grafana dashboard JSON.

Requirements from ticket:
Title: {title}
Description: {description}
Acceptance criteria: {acceptance_criteria}

Available data (Athena tables):
{schema_context}

Rules:
- Use Athena datasource (uid: "athena-security-lakehouse")  
- All time queries must use $__timeFilter(dt) macro
- Include: title, description, tags, panels array, time range, refresh
- Panel types allowed: timeseries, bargauge, stat, table, piechart
- Each panel needs: title, type, targets (with rawSQL), gridPos
- Use variables: $sourcetype, $region, $environment
- Return ONLY valid JSON, no explanation

Generate the dashboard JSON:"""

@app.post("/generate-dashboard")
async def generate_dashboard(req: DashboardRequest):
    # 1. Get schema for relevant tables
    schema = get_relevant_schema(req.description)
    
    # 2. Generate dashboard JSON via LLM
    dashboard_json = llm_call(DASHBOARD_PROMPT.format(
        title=req.title,
        description=req.description, 
        acceptance_criteria=req.acceptance_criteria,
        schema_context=schema
    ))
    
    # 3. Parse + validate
    dashboard = json.loads(dashboard_json)
    validate_dashboard(dashboard)  # check required fields, SQL syntax
    
    # 4. Commit to GitLab
    branch = f"auto/dashboard-{req.ticket_id}"
    filename = f"grafana/dashboards/auto/{req.ticket_id}-{slugify(req.title)}.json"
    
    gitlab_create_branch(branch)
    gitlab_commit_file(branch, filename, json.dumps(dashboard, indent=2))
    mr_url = gitlab_create_mr(branch, f"Auto: Dashboard for {req.ticket_id} - {req.title}")
    
    # 5. Comment on Jira
    jira_comment(req.ticket_id, f"Dashboard PR ready for review: {mr_url}\nPreview SQL included in PR description.")
    
    return {"mr_url": mr_url, "dashboard_title": dashboard["title"]}

Zendesk integration (same flow)

@app.post("/webhook/zendesk")
async def zendesk_webhook(payload: dict):
    ticket = payload['ticket']
    if 'dashboard-request' not in ticket.get('tags', []):
        return {"status": "ignored"}
    
    await generate_dashboard(DashboardRequest(
        ticket_id=f"ZD-{ticket['id']}",
        title=ticket['subject'],
        description=ticket['description'],
        acceptance_criteria=extract_acceptance_criteria(ticket['description'])
    ))

3. Anomaly Detection

Scheduled job (runs every 15 min via EKS CronJob)

# anomaly-detector/detector.py
ANOMALY_PROMPT = """Analyze this security data for anomalies. 
Look for: unusual auth failure spikes, new source IPs, traffic to unexpected destinations,
volume deviations > 3 sigma from 7-day baseline.

Current window (last 15 min):
{current_data}

7-day baseline (avg per 15-min window):
{baseline_data}

If anomalies found, respond with JSON:
{{"anomalies": [{{"type": str, "severity": "low|medium|high|critical", "description": str, "recommended_action": str}}]}}
If no anomalies, respond: {{"anomalies": []}}"""

def run_detection():
    # Query current + baseline
    current = athena_query("""
        SELECT sourcetype, count(*) as events, count(distinct src_endpoint.ip) as unique_ips
        FROM network_activity
        WHERE time >= (unix_timestamp() - 900) * 1000  -- last 15 min
        GROUP BY sourcetype
    """)
    
    baseline = athena_query("""
        SELECT sourcetype, 
               avg(cnt) as avg_events, 
               stddev(cnt) as stddev_events
        FROM (
            SELECT sourcetype, 
                   date_trunc('minute', from_unixtime(time/1000)) as bucket,
                   count(*) as cnt
            FROM network_activity
            WHERE dt >= current_date - interval '7' day
            GROUP BY 1, 2
        )
        GROUP BY sourcetype
    """)
    
    # Ask LLM
    result = json.loads(llm_call(ANOMALY_PROMPT.format(
        current_data=current.to_string(),
        baseline_data=baseline.to_string()
    )))
    
    # Create Jira tickets for high/critical anomalies
    for anomaly in result['anomalies']:
        if anomaly['severity'] in ('high', 'critical'):
            jira_create_ticket(
                project='SEC',
                summary=f"[AUTO] {anomaly['type']}: {anomaly['description'][:80]}",
                description=f"""
**Detected by:** AI Anomaly Detector
**Severity:** {anomaly['severity']}
**Time:** {datetime.now().isoformat()}

**Description:**
{anomaly['description']}

**Recommended Action:**
{anomaly['recommended_action']}

**Query used:** See attachment
                """,
                priority=anomaly['severity'].capitalize(),
                labels=['auto-detected', 'security-anomaly']
            )

LLM Options

Option Cost Latency Privacy Setup
AWS Bedrock Claude Haiku ~$0.001/query 1-2s AWS boundary Zero - already in AWS
AWS Bedrock Llama 3 ~$0.0006/query 1-3s AWS boundary Zero
Ollama on EKS GPU node ~$0.80/hr (g4dn.xlarge) 2-5s Fully private Medium
OpenRouter ~$0.001/query 1-3s Leaves AWS Already have key

Recommendation: Bedrock Claude Haiku for Text2SQL and summarization (fast, cheap, no egress). Ollama/GPU only if hard data residency requirement.


Grafana AI Panel

{
  "type": "text",
  "title": "AI Security Analyst",
  "options": {
    "content": "<form onsubmit='queryAI(event)'><input id='q' placeholder='Ask a question...' style='width:80%'/><button>Ask</button></form><div id='result'></div><script>async function queryAI(e){e.preventDefault();const r=await fetch('https://api.ic-security.internal/query',{method:'POST',body:JSON.stringify({question:document.getElementById('q').value})});const d=await r.json();document.getElementById('result').innerHTML='<b>SQL:</b><pre>'+d.sql+'</pre><b>Summary:</b><p>'+d.summary+'</p>';}</script>"
  }
}

Security Controls on AI Layer

  • All Bedrock calls stay within AWS VPC endpoint (no public internet)
  • Query API: AD auth required (IAM Identity Center SAML token)
  • SQL injection prevention: LLM output validated against whitelist of allowed tables/columns before execution
  • Athena workgroup: read-only, result encryption, max scan 10GB per query
  • Anomaly tickets: auto-created but require human assignment before action
  • Dashboard PRs: require human MR approval before deploy (not auto-merged)