Instalar dependências
pip install httpx pandas matplotlib seaborn arxiv
Para análise completa
pip install -r requirements.txt
Análise completa do ecossistema de IA
python ai_bubble_detector.py
Output esperado:
🤖 AI/AGI Bubble Detector - Iniciando análise...
📊 Coletando dados de múltiplas fontes...
🧮 Calculando métricas de bolha...
#
🎯 Índice de Bolha: 64.3%
🚦 Nível de Risco: Alto
Compara ChatGPT, Claude, Stable Diffusion, etc.
python compare_ai_technologies.py
Output:
🏆 Ranking por Índice de Bolha:
🔴 ChatGPT/OpenAI | Bolha: 72.3% | Risco: Crítico
🟠 Stable Diffusion | Bolha: 58.1% | Risco: Alto
🟡 LangChain | Bolha: 45.2% | Risco: Moderado
🟢 Transformers | Bolha: 28.9% | Risco: Baixo
Iniciar API
uvicorn backend_api:app --reload
Analisar IA via API
curl -X POST "http://localhost:8000/api/analyze" \
-H "Content-Type: application/json" \
-d '{
"github_owner": "openai",
"github_repo": "openai-python",
"subreddit": "OpenAI"
}'
---
import asyncio
from ai_bubble_detector import AIBubbleCalculator, AIDataFetcher
async def analyze_chatgpt():
# Configurar fontes de dados
config = {
"pypi_packages": ["openai"],
"github_repos": [("openai", "openai-python")],
"subreddits": ["OpenAI", "ChatGPT"],
"huggingface_models": ["gpt2"]
}
# Coletar dados
data = {}
data["pypi"] = await AIDataFetcher.fetch_ai_pypi_downloads(
config["pypi_packages"]
)
data["github"] = await AIDataFetcher.fetch_ai_github_repos({
"openai": config["github_repos"][0]
})
data["reddit"] = await AIDataFetcher.fetch_ai_reddit_sentiment(
config["subreddits"]
)
# Calcular métricas
metrics = AIBubbleCalculator.calculate_bubble_index(data)
print(f"ChatGPT Bubble Index: {metrics['bubble_index']:.1%}")
print(f"Risk Level: {metrics['risk_level']}")
if metrics['red_flags']:
print("\nRed Flags:")
for flag in metrics['red_flags']:
print(f" {flag}")
return metrics
Executar
metrics = asyncio.run(analyze_chatgpt())
import asyncio
import time
from datetime import datetime
import json
async def track_bubble_over_time(hours=24, interval_minutes=60):
"""Tracking de bolha ao longo do tempo"""
history = []
for i in range(hours):
print(f"\n📊 Coleta {i+1}/{hours} - {datetime.now()}")
# Coletar dados
data = await AIBubbleCalculator.fetch_all_data()
metrics = AIBubbleCalculator.calculate_bubble_index(data)
# Armazenar
history.append({
"timestamp": datetime.now().isoformat(),
"bubble_index": metrics["bubble_index"],
"adoption": metrics["adoption"],
"hype": metrics["hype"],
"risk_level": metrics["risk_level"]
})
# Salvar incrementalmente
with open("bubble_tracking.json", "w") as f:
json.dump(history, f, indent=2)
print(f" Bubble Index: {metrics['bubble_index']:.1%}")
print(f" Risk: {metrics['risk_level']}")
# Aguardar próximo intervalo
if i < hours - 1:
await asyncio.sleep(interval_minutes * 60)
return history
Executar tracking por 24 horas
history = asyncio.run(track_bubble_over_time(hours=24, interval_minutes=60))
import asyncio
import smtplib
from email.message import EmailMessage
async def monitor_with_alerts(threshold=0.7, check_interval_hours=1):
"""Monitora e envia alertas quando threshold é ultrapassado"""
while True:
# Análise
data = await AIBubbleCalculator.fetch_all_data()
metrics = AIBubbleCalculator.calculate_bubble_index(data)
bubble_index = metrics["bubble_index"]
risk_level = metrics["risk_level"]
print(f"[{datetime.now()}] Bubble Index: {bubble_index:.1%} - {risk_level}")
# Verificar threshold
if bubble_index > threshold:
send_alert(
subject=f"🚨 AI Bubble Alert: {risk_level}",
body=f"""
Bubble Index ultrapassou threshold de {threshold:.0%}!
Current Index: {bubble_index:.1%}
Risk Level: {risk_level}
Métricas:
- Adoção: {metrics['adoption']:.1%}
- Hype: {metrics['hype']:.1%}
- Divergência: {metrics['divergence']:.1%}
Red Flags:
{chr(10).join(metrics['red_flags'])}
"""
)
print("📧 Alerta enviado!")
# Aguardar próxima verificação
await asyncio.sleep(check_interval_hours * 3600)
def send_alert(subject: str, body: str):
"""Envia email de alerta"""
msg = EmailMessage()
msg['Subject'] = subject
msg['From'] = "bubble-detector@example.com"
msg['To'] = "your-email@example.com"
msg.set_content(body)
# Configurar SMTP
with smtplib.SMTP('smtp.gmail.com', 587) as smtp:
smtp.starttls()
smtp.login("your-email@example.com", "your-password")
smtp.send_message(msg)
Executar monitor
asyncio.run(monitor_with_alerts(threshold=0.7, check_interval_hours=1))
import streamlit as st
import asyncio
import pandas as pd
import plotly.graph_objects as go
from ai_bubble_detector import AIBubbleCalculator
st.set_page_config(
page_title="AI Bubble Detector",
page_icon="🔍",
layout="wide"
)
st.title("🤖 AI Bubble Detector Dashboard")
Botão para análise
if st.button("🔄 Executar Análise"):
with st.spinner("Coletando dados..."):
# Executar análise
data = asyncio.run(AIBubbleCalculator.fetch_all_data())
metrics = AIBubbleCalculator.calculate_bubble_index(data)
# Exibir métricas principais
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric(
"Bubble Index",
f"{metrics['bubble_index']:.1%}",
delta=None
)
with col2:
st.metric(
"Risk Level",
metrics['risk_level'],
delta=None
)
with col3:
st.metric(
"Adoção",
f"{metrics['adoption']:.1%}",
delta=None
)
with col4:
st.metric(
"Hype",
f"{metrics['hype']:.1%}",
delta=None
)
# Gráfico de radar
st.subheader("📊 Análise Multidimensional")
fig = go.Figure(data=go.Scatterpolar(
r=[
metrics['adoption'],
metrics['hype'],
metrics['research_momentum'],
metrics['network'],
metrics['commoditization']
],
theta=['Adoção', 'Hype', 'Pesquisa', 'Rede', 'Commoditização'],
fill='toself'
))
fig.update_layout(
polar=dict(radialaxis=dict(visible=True, range=[0, 1])),
showlegend=False
)
st.plotly_chart(fig, use_container_width=True)
# Red flags
if metrics['red_flags']:
st.subheader("⚠️ Red Flags Detectados")
for flag in metrics['red_flags']:
st.warning(flag)
else:
st.success("✅ Nenhum red flag detectado")
Rodar: streamlit run dashboard.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import pandas as pd
from datetime import datetime
class BubbleDataStore:
"""Armazena histórico de análises"""
def __init__(self, db_url):
self.engine = create_engine(db_url)
self.Session = sessionmaker(bind=self.engine)
def save_analysis(self, metrics: dict):
"""Salva análise no banco"""
session = self.Session()
try:
# Inserir no banco (usando modelo do init.sql)
session.execute(
"""
INSERT INTO bubble_metrics
(asset, timestamp, adoption, hype, investment,
network, feedback, bubble_index, risk_level, divergence)
VALUES
(:asset, :timestamp, :adoption, :hype, :investment,
:network, :feedback, :bubble_index, :risk_level, :divergence)
""",
{
"asset": "AI-General",
"timestamp": int(datetime.now().timestamp() * 1000),
"adoption": metrics["adoption"],
"hype": metrics["hype"],
"investment": 0.5, # Placeholder
"network": metrics["network"],
"feedback": metrics["feedback"],
"bubble_index": metrics["bubble_index"],
"risk_level": metrics["risk_level"],
"divergence": metrics["divergence"]
}
)
session.commit()
print("✅ Análise salva no banco")
except Exception as e:
session.rollback()
print(f"❌ Erro ao salvar: {e}")
finally:
session.close()
def get_historical_trend(self, days=30):
"""Recupera tendência histórica"""
query = f"""
SELECT timestamp, bubble_index, risk_level, adoption, hype
FROM bubble_metrics
WHERE asset = 'AI-General'
AND timestamp > extract(epoch from now() - interval '{days} days') * 1000
ORDER BY timestamp ASC
"""
return pd.read_sql(query, self.engine)
Uso
store = BubbleDataStore("postgresql://user:pass@localhost/bubbledb")
Salvar análise
metrics = asyncio.run(analyze_ai_bubble())
store.save_analysis(metrics)
Recuperar histórico
df = store.get_historical_trend(days=30)
print(df.head())
---
class CustomAICalculator(AIBubbleCalculator):
"""Calculator personalizado para domínios específicos"""
@staticmethod
def calculate_adoption(data: Dict, domain: str = "general") -> float:
"""Adoção com pesos personalizados por domínio"""
if domain == "enterprise":
# Enterprise: GitHub matters more
weights = {"pypi": 0.2, "github": 0.5, "hf": 0.3}
elif domain == "research":
# Research: arXiv and HF matter more
weights = {"pypi": 0.2, "github": 0.2, "hf": 0.6}
elif domain == "consumer":
# Consumer: PyPI (usage) matters most
weights = {"pypi": 0.6, "github": 0.2, "hf": 0.2}
else:
weights = {"pypi": 0.35, "github": 0.30, "hf": 0.35}
# ... implementar lógica com weights
import redis
import hashlib
from functools import wraps
Redis para cache
cache = redis.Redis(host='localhost', port=6379, db=0)
def cached(ttl=3600):
"""Decorator para cache de resultados"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Gerar chave de cache
key = f"{func.__name__}:{hashlib.md5(str(args).encode()).hexdigest()}"
# Verificar cache
cached_value = cache.get(key)
if cached_value:
return json.loads(cached_value)
# Executar função
result = await func(*args, **kwargs)
# Armazenar em cache
cache.setex(key, ttl, json.dumps(result, default=str))
return result
return wrapper
return decorator
Uso
@cached(ttl=3600)
async def fetch_data_with_cache(package):
return await fetch_pypi_stats(package)
---
"""
Use Case: Avaliar investimentos em startups de IA
"""
async def evaluate_startup_risk(startup_name: str, config: dict):
# Coletar dados
data = await collect_tech_data(startup_name, config)
metrics = calculate_bubble_metrics(data)
# Classificação para investidores
if metrics['bubble_index'] > 0.7:
recommendation = "AVOID - Em bolha"
explanation = "Alto risco de correção"
elif metrics['bubble_index'] > 0.5:
recommendation = "CAUTION - Avaliação elevada"
explanation = "Aguardar correção ou valorizar com desconto"
elif metrics['bubble_index'] > 0.3:
recommendation = "MONITOR - Crescimento acelerado"
explanation = "Avaliar fundamentals com cuidado"
else:
recommendation = "CONSIDER - Crescimento saudável"
explanation = "Alinhamento entre hype e adoção"
return {
"startup": startup_name,
"recommendation": recommendation,
"explanation": explanation,
"metrics": metrics
}
"""
Use Case: Detectar tendências emergentes vs modismos
"""
async def classify_trend(technology: str, config: dict):
# Análise temporal
history = []
for _ in range(12): # 12 meses
data = await collect_tech_data(technology, config)
metrics = calculate_bubble_metrics(data)
history.append(metrics['bubble_index'])
await asyncio.sleep(30 * 24 * 3600) # 30 dias
# Análise de tendência
trend = "INCREASING" if history[-1] > history[0] else "DECREASING"
volatility = np.std(history)
if volatility > 0.2 and trend == "INCREASING":
classification = "FOMO/Hype Cycle"
elif volatility < 0.1 and history[-1] < 0.3:
classification = "Sustainable Growth"
elif trend == "DECREASING" and history[-1] < 0.4:
classification = "Mature Technology"
else:
classification = "Uncertain"
return classification
"""
Use Case: Due diligence automatizada para M&A
"""
async def due_diligence_report(company: str, config: dict):
# Análise completa
data = await collect_tech_data(company, config)
metrics = calculate_bubble_metrics(data)
# Gerar relatório
report = {
"company": company,
"analysis_date": datetime.now().isoformat(),
"executive_summary": {
"bubble_index": metrics['bubble_index'],
"risk_level": metrics['risk_level'],
"recommendation": "..." # baseado em métricas
},
"detailed_metrics": metrics,
"market_position": {
"adoption_percentile": "...",
"hype_percentile": "...",
"vs_category_avg": "..."
},
"risk_factors": metrics.get('red_flags', []),
"growth_indicators": {
"github_star_velocity": "...",
"download_growth_rate": "...",
"community_sentiment": "..."
}
}
return report
---
| Index | Interpretação | Ação Recomendada | |-------|---------------|------------------| | 0-30% | Crescimento saudável, boa relação hype/adoção | Acompanhar | | 30-50% | Expectativas elevadas, crescimento acelerado | Monitorar de perto | | 50-70% | Sinais de sobrevalorização, divergência clara | Cautela | | 70-100% | Bolha provável, alta probabilidade de correção | Evitar/Reduzir exposição |
1. Hype > 2x Adoção: Expectativas muito além da realidade 2. Breakthrough Ratio < 5%: Estagnação em pesquisa 3. Commoditização > 80%: Saturação do mercado 4. Sentiment Negativo: Comunidade perdendo confiança ---
---
Para dúvidas ou sugestões: