Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to fix to context by multi-agents #289

Open
Cyarun opened this issue Jan 12, 2025 · 2 comments
Open

Unable to fix to context by multi-agents #289

Cyarun opened this issue Jan 12, 2025 · 2 comments

Comments

@Cyarun
Copy link

Cyarun commented Jan 12, 2025

Below is the code that had to send domain name to the tool and the ai agent should execute the tools collect the response and update the qdrant database. But, when we run the code we see Ai Agents never stuck to the context. I always see the AI Agents asking the tool to lookup exampl.com. Not sure why ?

import os
import requests
import json
import base64
from dotenv import load_dotenv
from qdrant_client import QdrantClient
from praisonaiagents import Agent, Task, PraisonAIAgents, Memory

Load environment variables from .env.local

if not load_dotenv(dotenv_path=".env.local"):
raise FileNotFoundError(".env.local file not found. Please ensure it exists in the working directory.")

API Keys

api_keys = {
"WHOXY_API_KEY": os.getenv("WHOXY_API_KEY"),
"FOFA_API_KEY": os.getenv("FOFA_API_KEY"),
"API_NINJAS_KEY": os.getenv("API_NINJAS_KEY"),
"QDRANT_API_KEY": os.getenv("QDRANT_API_KEY"),
}

Debug API Keys

for key, value in api_keys.items():
if not value:
raise ValueError(f"Missing API key: {key}. Ensure it is set in the environment variables or .env.local.")

Qdrant Configuration

qdrant_url = "https://d32d17dc-6b1d-4e13-9bb5-94a4d9f4b457.europe-west3-0.gcp.cloud.qdrant.io:6333"
qdrant_client = QdrantClient(url=qdrant_url, api_key=api_keys["QDRANT_API_KEY"])

Tool Implementations

def query_fofa(query: str = "eenadu.net") -> dict:
base64_query = base64.b64encode(query.encode("utf-8")).decode("utf-8")
response = requests.get(
f"https://fofa.info/api/v1/search/all?key={api_keys['FOFA_API_KEY']}&qbase64={base64_query}&size=100&fields=jarm,host,domain,title,ip,port,protocol,base_protocol,country,country_name,region,city,longitude,latitude,as_number,as_organization,os,server,icp,header,banner,cert,link,certs_issuer_org,certs_issuer_cn,certs_subject_org,certs_subject_cn,tls_ja3s,tls_version"
)
return response.json()

def query_crtsh(domain: str = "eenadu.net") -> list:
response = requests.get(f"https://crt.sh/?q={domain}&output=json")
return response.json()

def query_whoxy(email_or_org: str = "eenadu.net") -> dict:
response = requests.get(
f"https://api.whoxy.com/?key={api_keys['WHOXY_API_KEY']}&reverse=whois&search={email_or_org}"
)
return response.json()

def query_api_ninjas(domain: str = "eenadu.net") -> dict:
response = requests.get(
f"https://api.api-ninjas.com/v1/dnslookup?domain={domain}",
headers={"X-Api-Key": api_keys["API_NINJAS_KEY"]},
)
return response.json()

def query_networkcalc(domain: str = "eenadu.net") -> dict:
response = requests.get(f"https://networkcalc.com/api/dns/lookup/{domain}")
return response.json()

def perform_reverse_lookups(domain: str, manager_memory: dict):
"""Perform WHOXY lookups and store related domains."""
reverse_data = query_whoxy(domain)
email = reverse_data.get("email_address", "")
organization = reverse_data.get("organization_name", "")
owner = reverse_data.get("owner_name", "")

# Perform additional reverse lookups
related_by_email = query_whoxy(email) if email else {}
related_by_org = query_whoxy(organization) if organization else {}
related_by_owner = query_whoxy(owner) if owner else {}

# Filter out unrelated domains
in_scope_domains = set()
for lookup_data in [related_by_email, related_by_org, related_by_owner]:
    for domain_info in lookup_data.get("searchResult", {}).get("domains", []):
        if domain in domain_info or is_in_scope(domain_info):
            in_scope_domains.add(domain_info)

# Store results in memory
manager_memory["related_domains"] = list(in_scope_domains)

def is_in_scope(domain: str) -> bool:
"""Filter domains based on scope criteria."""
out_of_scope_keywords = ["example.com", "google.com", "microsoft.com", "cloudflare.com"]
return not any(keyword in domain for keyword in out_of_scope_keywords)

Define Agents

manager_agent = Agent(
name="Manager Agent",
role="Coordinate data collection and refinement",
goal="Collect user input, oversee tasks, and perform WHOXY lookups for related domains.",
backstory="Strategically manages all agents and ensures focused execution.",
tools=[],
verbose=True,
llm="gpt-4o",
markdown=True,
memory={"short_term": {}, "long_term": qdrant_client},
)

tool_agent = Agent(
name="Tool Agent",
role="Run initial tool queries",
goal="Gather subdomains, DNS records, reverse WHOIS data, and FOFA data.",
backstory="Tool specialist to run initial information-gathering queries using all available tools.",
tools=[query_fofa, query_crtsh, query_whoxy, query_api_ninjas, query_networkcalc],
verbose=True,
llm="gpt-4o",
markdown=True,
memory={"short_term": {}, "long_term": qdrant_client},
)

sr_pentester_agent = Agent(
name="Senior Pentester",
role="Analyze tool results for vulnerabilities",
goal="Investigate vulnerabilities and organize data for deep dives.",
backstory="An expert in penetration testing and data correlation.",
tools=[],
verbose=True,
llm="gpt-4o",
markdown=True,
memory={"short_term": {}, "long_term": qdrant_client},
)

ciso_agent = Agent(
name="CISO Agent",
role="Executive Security Analysis",
goal="Review findings and provide high-level recommendations.",
backstory="Provides strategic insights and ensures alignment with organizational goals.",
tools=[],
verbose=True,
llm="gpt-4o",
markdown=True,
memory={"short_term": {}, "long_term": qdrant_client},
)

Define Tasks

tool_query_task = Task(
name="tool_query_task",
description="Run all tools to gather initial domain data.",
expected_output="Raw data from all tools stored in Qdrant.",
agent=tool_agent,
)

pentester_analysis_task = Task(
name="pentester_analysis_task",
description="Analyze tool output for vulnerabilities and prioritize areas of focus.",
expected_output="Organized and prioritized findings stored in Qdrant.",
agent=sr_pentester_agent,
)

ciso_review_task = Task(
name="ciso_review_task",
description="Review pentester findings and provide strategic recommendations.",
expected_output="High-level executive recommendations stored in Qdrant.",
agent=ciso_agent,
)

Combine Agents and Tasks

agents = PraisonAIAgents(
agents=[manager_agent, tool_agent, sr_pentester_agent, ciso_agent],
tasks=[tool_query_task, pentester_analysis_task, ciso_review_task],
verbose=True,
process="hierarchical",
manager_llm="gpt-4o",
)

Main Execution Workflow

def main():
print("Welcome to the Hierarchical Pentesting Workflow!")
domain = "eenadu.net" # Hardcoded domain for consistency

# Ensure the domain is correctly set in manager memory
if "domain" not in manager_agent.memory["short_term"] or manager_agent.memory["short_term"]["domain"] != domain:
    manager_agent.memory["short_term"]["domain"] = domain

# Execute the workflow
result = agents.start()

# Serialize TaskOutput to JSON-serializable format
serialized_result = []
for task in agents.tasks:
    if hasattr(task, "name") and hasattr(task, "output"):
        serialized_result.append({
            "task_name": task.name,
            "output": task.output.to_dict() if hasattr(task.output, "to_dict") else str(task.output),
        })
    else:
        print(f"Skipping task: {task} due to missing attributes.")

# Output results
print("Workflow Results:")
print(json.dumps(serialized_result, indent=4))

if name == "main":
main()

@MervinPraison
Copy link
Owner

@Cyarun Did you try with gpt-4o model ?

@Cyarun
Copy link
Author

Cyarun commented Jan 14, 2025 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants