connectd/daemon.py

686 lines
26 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
connectd daemon - continuous discovery and matchmaking
REWIRED TO USE CENTRAL DATABASE
"""
import time
import json
import signal
import os
import sys
from datetime import datetime, timedelta
from pathlib import Path
from db import Database
from db.users import (init_users_table, get_priority_users, save_priority_match,
get_priority_user_matches, discover_host_user, mark_match_viewed)
from scoutd import scrape_github, scrape_reddit, scrape_mastodon, scrape_lobsters, scrape_lemmy, scrape_discord
from scoutd.forges import scrape_all_forges
from config import HOST_USER
from scoutd.github import analyze_github_user, get_github_user
from scoutd.signals import analyze_text
from matchd.fingerprint import generate_fingerprint, fingerprint_similarity
from matchd.overlap import find_overlap
from matchd.lost import find_matches_for_lost_builders
from introd.draft import draft_intro, summarize_human, summarize_overlap
from introd.lost_intro import draft_lost_intro, get_lost_intro_config
from introd.send import send_email
from introd.deliver import deliver_intro, determine_best_contact
from config import get_lost_config
from api import start_api_thread, update_daemon_state
from central_client import CentralClient, get_client
class DummyDb:
"""dummy db that does nothing - scrapers save here but we push to central"""
def save_human(self, human): pass
def save_match(self, *args, **kwargs): pass
def get_human(self, *args, **kwargs): return None
def close(self): pass
# daemon config
SCOUT_INTERVAL = 3600 * 4 # full scout every 4 hours
MATCH_INTERVAL = 3600 # check matches every hour
INTRO_INTERVAL = 3600 * 2 # send intros every 2 hours
LOST_INTERVAL = 3600 * 6 # lost builder outreach every 6 hours
from config import MAX_INTROS_PER_DAY
MIN_OVERLAP_PRIORITY = 30
MIN_OVERLAP_STRANGERS = 50
class ConnectDaemon:
def __init__(self, dry_run=False):
# local db only for priority_users (host-specific)
self.local_db = Database()
init_users_table(self.local_db.conn)
# CENTRAL for all humans/matches
self.central = get_client()
if not self.central:
raise RuntimeError("CENTRAL API REQUIRED - set CONNECTD_API_KEY and CONNECTD_CENTRAL_API")
self.log("connected to CENTRAL database")
self.running = True
self.dry_run = dry_run
self.started_at = datetime.now()
self.last_scout = None
self.last_match = None
self.last_intro = None
self.last_lost = None
self.intros_today = 0
self.lost_intros_today = 0
self.today = datetime.now().date()
# register instance
instance_id = os.environ.get('CONNECTD_INSTANCE_ID', 'daemon')
self.central.register_instance(instance_id, os.environ.get('CONNECTD_INSTANCE_IP', 'unknown'))
signal.signal(signal.SIGINT, self._shutdown)
signal.signal(signal.SIGTERM, self._shutdown)
if HOST_USER:
self.log(f"HOST_USER set: {HOST_USER}")
discover_host_user(self.local_db.conn, HOST_USER)
self._update_api_state()
def _shutdown(self, signum, frame):
print("\nconnectd: shutting down...")
self.running = False
self._update_api_state()
def _update_api_state(self):
now = datetime.now()
def secs_until(last, interval):
base = last if last else self.started_at
next_run = base + timedelta(seconds=interval)
remaining = (next_run - now).total_seconds()
return max(0, int(remaining))
update_daemon_state({
'running': self.running,
'dry_run': self.dry_run,
'last_scout': self.last_scout.isoformat() if self.last_scout else None,
'last_match': self.last_match.isoformat() if self.last_match else None,
'last_intro': self.last_intro.isoformat() if self.last_intro else None,
'last_lost': self.last_lost.isoformat() if self.last_lost else None,
'intros_today': self.intros_today,
'lost_intros_today': self.lost_intros_today,
'started_at': self.started_at.isoformat(),
'countdown_scout': secs_until(self.last_scout, SCOUT_INTERVAL),
'countdown_match': secs_until(self.last_match, MATCH_INTERVAL),
'countdown_intro': secs_until(self.last_intro, INTRO_INTERVAL),
'countdown_lost': secs_until(self.last_lost, LOST_INTERVAL),
})
def log(self, msg):
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {msg}")
def reset_daily_limits(self):
if datetime.now().date() != self.today:
self.today = datetime.now().date()
self.intros_today = 0
self.lost_intros_today = 0
self.log("reset daily intro limits")
def scout_cycle(self):
"""run discovery - scrape to CENTRAL"""
self.log("starting scout cycle (-> CENTRAL)...")
# dummy db - scrapers save here but we push to central
dummy_db = DummyDb()
scraped_humans = []
try:
# github - returns list of humans
from scoutd.github import scrape_github
gh_humans = scrape_github(dummy_db, limit_per_source=30)
if gh_humans:
scraped_humans.extend(gh_humans)
self.log(f" github: {len(gh_humans) if gh_humans else 0} humans")
except Exception as e:
self.log(f"github scout error: {e}")
try:
from scoutd.reddit import scrape_reddit
reddit_humans = scrape_reddit(dummy_db, limit_per_sub=30)
if reddit_humans:
scraped_humans.extend(reddit_humans)
self.log(f" reddit: {len(reddit_humans) if reddit_humans else 0} humans")
except Exception as e:
self.log(f"reddit scout error: {e}")
try:
from scoutd.mastodon import scrape_mastodon
masto_humans = scrape_mastodon(dummy_db, limit_per_instance=30)
if masto_humans:
scraped_humans.extend(masto_humans)
self.log(f" mastodon: {len(masto_humans) if masto_humans else 0} humans")
except Exception as e:
self.log(f"mastodon scout error: {e}")
try:
forge_humans = scrape_all_forges(limit_per_instance=30)
if forge_humans:
scraped_humans.extend(forge_humans)
self.log(f" forges: {len(forge_humans) if forge_humans else 0} humans")
except Exception as e:
self.log(f"forge scout error: {e}")
try:
from scoutd.lobsters import scrape_lobsters
lob_humans = scrape_lobsters(dummy_db)
if lob_humans:
scraped_humans.extend(lob_humans)
self.log(f" lobsters: {len(lob_humans) if lob_humans else 0} humans")
except Exception as e:
self.log(f"lobsters scout error: {e}")
# push all to central
if scraped_humans:
self.log(f"pushing {len(scraped_humans)} humans to CENTRAL...")
try:
created, updated = self.central.upsert_humans_bulk(scraped_humans)
self.log(f" central: {created} created, {updated} updated")
except Exception as e:
self.log(f" central push error: {e}")
self.last_scout = datetime.now()
stats = self.central.get_stats()
self.log(f"scout complete: {stats.get('total_humans', 0)} humans in CENTRAL")
def match_priority_users(self):
"""find matches for priority users (hosts) using CENTRAL data"""
priority_users = get_priority_users(self.local_db.conn)
if not priority_users:
return
self.log(f"matching for {len(priority_users)} priority users (from CENTRAL)...")
# get humans from CENTRAL
humans = self.central.get_all_humans(min_score=20)
for puser in priority_users:
# use stored signals first (from discovery/scoring)
puser_signals = []
if puser.get('signals'):
stored = puser['signals']
if isinstance(stored, str):
try:
stored = json.loads(stored)
except:
stored = []
puser_signals.extend(stored)
# supplement with interests if no signals stored
if not puser_signals and puser.get('interests'):
interests = json.loads(puser['interests']) if isinstance(puser['interests'], str) else puser['interests']
puser_signals.extend(interests)
if not puser_signals:
self.log(f" skipping {puser.get('name')} - no signals")
continue
matches_found = 0
for human in humans:
human_user = human.get('username', '').lower()
if puser.get('github') and human_user == puser['github'].lower():
continue
if puser.get('reddit') and human_user == puser['reddit'].lower():
continue
if puser.get('mastodon') and human_user == puser['mastodon'].lower().split('@')[0]:
continue
human_signals = human.get('signals', [])
if isinstance(human_signals, str):
try:
human_signals = json.loads(human_signals)
except:
human_signals = []
shared = set(puser_signals) & set(human_signals)
overlap_score = len(shared) * 10
if puser.get('location') and human.get('location'):
if 'seattle' in str(human.get('location', '')).lower() or 'pnw' in str(human.get('location', '')).lower():
overlap_score += 20
if overlap_score >= MIN_OVERLAP_PRIORITY:
overlap_data = {
'overlap_score': overlap_score,
'overlap_reasons': [f"shared: {', '.join(list(shared)[:5])}"] if shared else [],
}
save_priority_match(self.local_db.conn, puser['id'], human['id'], overlap_data)
matches_found += 1
if matches_found:
self.log(f" found {matches_found} matches for {puser['name'] or puser['email']}")
def match_strangers(self):
"""find matches between discovered humans - save to CENTRAL"""
self.log("matching strangers (-> CENTRAL)...")
humans = self.central.get_all_humans(min_score=40)
if len(humans) < 2:
return
fingerprints = {}
for human in humans:
fp = generate_fingerprint(human)
fingerprints[human['id']] = fp
matches_found = 0
new_matches = []
from itertools import combinations
for human_a, human_b in combinations(humans, 2):
if human_a['platform'] == human_b['platform']:
if human_a['username'] == human_b['username']:
continue
fp_a = fingerprints.get(human_a['id'])
fp_b = fingerprints.get(human_b['id'])
overlap = find_overlap(human_a, human_b, fp_a, fp_b)
if overlap and overlap["overlap_score"] >= MIN_OVERLAP_STRANGERS:
new_matches.append({
'human_a_id': human_a['id'],
'human_b_id': human_b['id'],
'overlap_score': overlap['overlap_score'],
'overlap_reasons': json.dumps(overlap.get('overlap_reasons', []))
})
matches_found += 1
# bulk push to central
if new_matches:
self.log(f"pushing {len(new_matches)} matches to CENTRAL...")
try:
created = self.central.create_matches_bulk(new_matches)
self.log(f" central: {created} matches created")
except Exception as e:
self.log(f" central push error: {e}")
self.last_match = datetime.now()
def send_stranger_intros(self):
"""send intros using CENTRAL data"""
self.reset_daily_limits()
if not self.dry_run and self.intros_today >= MAX_INTROS_PER_DAY:
self.log("daily intro limit reached")
return
# get pending matches from CENTRAL
matches = self.central.get_matches(min_score=MIN_OVERLAP_STRANGERS, limit=20)
if self.dry_run:
self.log(f"DRY RUN: previewing {len(matches)} potential intros")
for match in matches:
if not self.dry_run and self.intros_today >= MAX_INTROS_PER_DAY:
break
# get full human data
human_a = self.central.get_human(match['human_a_id'])
human_b = self.central.get_human(match['human_b_id'])
if not human_a or not human_b:
continue
match_data = {
'id': match['id'],
'human_a': human_a,
'human_b': human_b,
'overlap_score': match['overlap_score'],
'overlap_reasons': match.get('overlap_reasons', ''),
}
for recipient, other in [(human_a, human_b), (human_b, human_a)]:
contact = recipient.get('contact', {})
if isinstance(contact, str):
try:
contact = json.loads(contact)
except:
contact = {}
email = contact.get('email')
if not email:
continue
# check if already contacted
if self.central.already_contacted(recipient['id']):
continue
# get token and interest count for recipient
try:
recipient_token = self.central.get_token(recipient['id'], match.get('id'))
interested_count = self.central.get_interested_count(recipient['id'])
except Exception as e:
print(f"[intro] failed to get token/count: {e}")
recipient_token = None
interested_count = 0
intro = draft_intro(match_data,
recipient='a' if recipient == human_a else 'b',
recipient_token=recipient_token,
interested_count=interested_count)
reasons = match.get('overlap_reasons', '')
if isinstance(reasons, str):
try:
reasons = json.loads(reasons)
except:
reasons = []
reason_summary = ', '.join(reasons[:3]) if reasons else 'aligned values'
if self.dry_run:
print("\n" + "=" * 60)
print(f"TO: {recipient['username']} ({recipient['platform']})")
print(f"EMAIL: {email}")
print(f"SUBJECT: you might want to meet {other['username']}")
print(f"SCORE: {match['overlap_score']:.0f} ({reason_summary})")
print("-" * 60)
print("MESSAGE:")
print(intro['draft'])
print("-" * 60)
print("[DRY RUN - NOT SENT]")
print("=" * 60)
break
else:
outreach_id = self.central.claim_outreach(recipient['id'], match['id'], 'intro')
if outreach_id is None:
self.log(f"skipping {recipient['username']} - already claimed")
continue
success, error = send_email(
email,
f"connectd: you might want to meet {other['username']}",
intro['draft']
)
if success:
self.log(f"sent intro to {recipient['username']} ({email})")
self.intros_today += 1
self.central.complete_outreach(outreach_id, 'sent', 'email', intro['draft'])
break
else:
self.log(f"failed to send to {email}: {error}")
self.central.complete_outreach(outreach_id, 'failed', error=error)
self.last_intro = datetime.now()
def send_priority_user_intros(self):
"""send intros TO priority users (hosts) about their matches"""
self.reset_daily_limits()
priority_users = get_priority_users(self.local_db.conn)
if not priority_users:
return
self.log(f"checking intros for {len(priority_users)} priority users...")
for puser in priority_users:
if not self.dry_run and self.intros_today >= MAX_INTROS_PER_DAY:
break
# get email
email = puser.get('email')
if not email:
continue
# get their matches from local priority_matches table
matches = get_priority_user_matches(self.local_db.conn, puser['id'], status='new', limit=5)
if not matches:
continue
for match in matches:
if not self.dry_run and self.intros_today >= MAX_INTROS_PER_DAY:
break
# get the matched human from CENTRAL (matched_human_id is central id)
human_id = match.get('matched_human_id')
if not human_id:
continue
human = self.central.get_human(human_id)
if not human:
continue
# build match data for drafting
overlap_reasons = match.get('overlap_reasons', '[]')
if isinstance(overlap_reasons, str):
try:
overlap_reasons = json.loads(overlap_reasons)
except:
overlap_reasons = []
puser_name = puser.get('name') or puser.get('email', '').split('@')[0]
human_name = human.get('name') or human.get('username')
# draft intro TO priority user ABOUT the matched human
match_data = {
'id': match.get('id'),
'human_a': {
'username': puser_name,
'platform': 'host',
'name': puser_name,
'bio': puser.get('bio', ''),
'signals': puser.get('signals', []),
},
'human_b': human,
'overlap_score': match.get('overlap_score', 0),
'overlap_reasons': overlap_reasons,
}
# try to get token for priority user (they might have a central ID)
recipient_token = None
interested_count = 0
if puser.get('central_id'):
try:
recipient_token = self.central.get_token(puser['central_id'], match.get('id'))
interested_count = self.central.get_interested_count(puser['central_id'])
except:
pass
intro = draft_intro(match_data, recipient='a',
recipient_token=recipient_token,
interested_count=interested_count)
reason_summary = ', '.join(overlap_reasons[:3]) if overlap_reasons else 'aligned values'
if self.dry_run:
print("\n" + "=" * 60)
print("PRIORITY USER INTRO")
print("=" * 60)
print(f"TO: {puser_name} ({email})")
print(f"ABOUT: {human_name} ({human.get('platform')})")
print(f"SCORE: {match.get('overlap_score', 0):.0f} ({reason_summary})")
print("-" * 60)
print("MESSAGE:")
print(intro['draft'])
print("-" * 60)
print("[DRY RUN - NOT SENT]")
print("=" * 60)
else:
success, error = send_email(
email,
f"connectd: you might want to meet {human_name}",
intro['draft']
)
if success:
self.log(f"sent priority intro to {puser_name} about {human_name}")
self.intros_today += 1
# mark match as notified
mark_match_viewed(self.local_db.conn, match['id'])
else:
self.log(f"failed to send priority intro to {email}: {error}")
def send_lost_builder_intros(self):
"""reach out to lost builders using CENTRAL data"""
self.reset_daily_limits()
lost_config = get_lost_config()
if not lost_config.get('enabled', True):
return
max_per_day = lost_config.get('max_per_day', 5)
if not self.dry_run and self.lost_intros_today >= max_per_day:
self.log("daily lost builder intro limit reached")
return
# get lost builders from CENTRAL
lost_builders = self.central.get_lost_builders(
min_score=lost_config.get('min_lost_score', 40),
limit=max_per_day - self.lost_intros_today
)
# get active builders from CENTRAL
builders = self.central.get_builders(min_score=50, limit=100)
if not lost_builders or not builders:
self.log("no lost builders or builders available")
return
if self.dry_run:
self.log(f"DRY RUN: previewing {len(lost_builders)} lost builder intros")
for lost in lost_builders:
if not self.dry_run and self.lost_intros_today >= max_per_day:
break
# find matching builder
best_builder = None
best_score = 0
for builder in builders:
lost_signals = lost.get('signals', [])
builder_signals = builder.get('signals', [])
if isinstance(lost_signals, str):
try:
lost_signals = json.loads(lost_signals)
except:
lost_signals = []
if isinstance(builder_signals, str):
try:
builder_signals = json.loads(builder_signals)
except:
builder_signals = []
shared = set(lost_signals) & set(builder_signals)
if len(shared) > best_score:
best_score = len(shared)
best_builder = builder
if not best_builder:
continue
lost_name = lost.get('name') or lost.get('username')
builder_name = best_builder.get('name') or best_builder.get('username')
draft, draft_error = draft_lost_intro(lost, best_builder, lost_config)
if draft_error:
self.log(f"error drafting lost intro for {lost_name}: {draft_error}")
continue
method, contact_info = determine_best_contact(lost)
if self.dry_run:
print("\n" + "=" * 60)
print("LOST BUILDER OUTREACH")
print("=" * 60)
print(f"TO: {lost_name} ({lost.get('platform')})")
print(f"DELIVERY: {method}{contact_info}")
print(f"LOST SCORE: {lost.get('lost_potential_score', 0)}")
print(f"INSPIRING BUILDER: {builder_name}")
print("-" * 60)
print("MESSAGE:")
print(draft)
print("-" * 60)
print("[DRY RUN - NOT SENT]")
print("=" * 60)
else:
match_data = {
'human_a': best_builder,
'human_b': lost,
'overlap_score': best_score * 10,
'overlap_reasons': [],
}
success, error, delivery_method = deliver_intro(match_data, draft)
if success:
self.log(f"sent lost builder intro to {lost_name} via {delivery_method}")
self.lost_intros_today += 1
else:
self.log(f"failed to reach {lost_name} via {delivery_method}: {error}")
self.last_lost = datetime.now()
self.log(f"lost builder cycle complete: {self.lost_intros_today} sent today")
def run(self):
"""main daemon loop"""
self.log("connectd daemon starting (CENTRAL MODE)...")
start_api_thread()
self.log("api server started on port 8099")
if self.dry_run:
self.log("*** DRY RUN MODE - no intros will be sent ***")
self.log(f"scout interval: {SCOUT_INTERVAL}s")
self.log(f"match interval: {MATCH_INTERVAL}s")
self.log(f"intro interval: {INTRO_INTERVAL}s")
self.log(f"lost interval: {LOST_INTERVAL}s")
self.log(f"max intros/day: {MAX_INTROS_PER_DAY}")
# initial scout
self.scout_cycle()
self._update_api_state()
while self.running:
now = datetime.now()
if not self.last_scout or (now - self.last_scout).seconds >= SCOUT_INTERVAL:
self.scout_cycle()
self._update_api_state()
if not self.last_match or (now - self.last_match).seconds >= MATCH_INTERVAL:
self.match_priority_users()
self.match_strangers()
self._update_api_state()
if not self.last_intro or (now - self.last_intro).seconds >= INTRO_INTERVAL:
self.send_stranger_intros()
self.send_priority_user_intros()
self._update_api_state()
if not self.last_lost or (now - self.last_lost).seconds >= LOST_INTERVAL:
self.send_lost_builder_intros()
self._update_api_state()
time.sleep(60)
self.log("connectd daemon stopped")
self.local_db.close()
def run_daemon(dry_run=False):
daemon = ConnectDaemon(dry_run=dry_run)
daemon.run()
if __name__ == '__main__':
import sys
dry_run = '--dry-run' in sys.argv
run_daemon(dry_run=dry_run)