diff --git a/api.py b/api.py index d1b0256..2c8522e 100644 --- a/api.py +++ b/api.py @@ -12,14 +12,22 @@ import threading from http.server import HTTPServer, BaseHTTPRequestHandler from datetime import datetime -from db import Database -from db.users import get_priority_users, get_priority_user_matches, get_priority_user +from central_client import CentralClient, get_client +from profile_page import render_profile # central API config import requests CENTRAL_API = os.environ.get('CONNECTD_CENTRAL_API', '') CENTRAL_KEY = os.environ.get('CONNECTD_API_KEY', '') +# global client instance +_central = None +def get_central(): + global _central + if _central is None: + _central = get_client() + return _central + API_PORT = int(os.environ.get('CONNECTD_API_PORT', 8099)) # shared state (updated by daemon) @@ -180,31 +188,18 @@ async function loadStats() { uptime = hrs + 'h ' + mins + 'm'; } - $('status').innerHTML = 'daemon ' + (h.running ? 'ON' : 'OFF') + ' | ' + uptime + ' | ' + h.intros_today + ' today'; - - var centralHtml = ''; - if (s.central && !s.central.error) { - centralHtml = '
people who need to see that someone like them made it
'; + var html = 'people who need to see that someone like them made it
'; if (!data.matches || data.matches.length === 0) { - html += ''; + html += ''; } for (var i = 0; i < (data.matches || []).length; i++) { var m = data.matches[i]; - html += 'no human found with username: {username}
back'.encode()) + return + + matches = central.get_matches(limit=10000) + match_count = sum(1 for m in matches if m.get('human_a_id') == human['id'] or m.get('human_b_id') == human['id']) + + html = render_profile(human, match_count=match_count) + + self.send_response(200) + self.send_header('Content-Type', 'text/html') + self.end_headers() + self.wfile.write(html.encode()) + + except Exception as e: + self._send_json({'error': str(e)}, 500) + + def _handle_profile_by_id(self): + """render profile page by human ID""" + path = self.path.split('?')[0] + parts = path.strip('/').split('/') + + if len(parts) != 2: + self._send_json({'error': 'invalid path'}, 400) + return + + try: + human_id = int(parts[1]) + except ValueError: + self._send_json({'error': 'invalid id'}, 400) + return + + try: + central = get_central() + human = central.get_human(human_id) + + if not human: + self.send_response(404) + self.send_header('Content-Type', 'text/html') + self.end_headers() + self.wfile.write(f'no human found with id: {human_id}
back'.encode()) + return + + matches = central.get_matches(limit=10000) + match_count = sum(1 for m in matches if m.get('human_a_id') == human['id'] or m.get('human_b_id') == human['id']) + + html = render_profile(human, match_count=match_count) + + self.send_response(200) + self.send_header('Content-Type', 'text/html') + self.end_headers() + self.wfile.write(html.encode()) + + except Exception as e: + self._send_json({'error': str(e)}, 500) + + def _handle_human_full_json(self): + """return full human data as JSON""" + path = self.path.split('?')[0] + parts = path.strip('/').split('/') + + if len(parts) != 4 or parts[0] != 'api' or parts[1] != 'humans' or parts[3] != 'full': + self._send_json({'error': 'invalid path'}, 400) + return + + try: + human_id = int(parts[2]) + except ValueError: + self._send_json({'error': 'invalid id'}, 400) + return + + try: + central = get_central() + human = central.get_human(human_id) + + if not human: + self._send_json({'error': 'not found'}, 404) + return + + for field in ['signals', 'negative_signals', 'reasons', 'contact', 'extra']: + if field in human and isinstance(human[field], str): + try: + human[field] = json.loads(human[field]) + except: + pass + + matches = central.get_matches(limit=10000) + human['match_count'] = sum(1 for m in matches if m.get('human_a_id') == human['id'] or m.get('human_b_id') == human['id']) + + self._send_json(human) + + except Exception as e: + self._send_json({'error': str(e)}, 500) + + + def run_api_server(): """run the API server in a thread""" server = HTTPServer(('0.0.0.0', API_PORT), APIHandler) print(f"connectd api running on port {API_PORT}") server.serve_forever() - def start_api_thread(): """start API server in background thread""" thread = threading.Thread(target=run_api_server, daemon=True) diff --git a/central_client.py b/central_client.py index b50610e..ff2ccbb 100644 --- a/central_client.py +++ b/central_client.py @@ -178,6 +178,27 @@ class CentralClient: return False +# convenience function + + # === TOKENS === + + def get_token(self, user_id: int, match_id: int = None) -> str: + """get or create a token for a user""" + params = {} + if match_id: + params['match_id'] = match_id + result = self._get(f'/api/token/{user_id}', params) + return result.get('token') + + def get_interested_count(self, user_id: int) -> int: + """get count of people interested in this user""" + try: + result = self._get(f'/api/interested_count/{user_id}') + return result.get('count', 0) + except: + return 0 + + # convenience function def get_client() -> CentralClient: return CentralClient() diff --git a/daemon.py b/daemon.py index dc7876c..798f393 100644 --- a/daemon.py +++ b/daemon.py @@ -1,12 +1,7 @@ #!/usr/bin/env python3 """ connectd daemon - continuous discovery and matchmaking - -two modes of operation: -1. priority matching: find matches FOR hosts who run connectd -2. altruistic matching: connect strangers to each other - -runs continuously, respects rate limits, sends intros automatically +REWIRED TO USE CENTRAL DATABASE """ import time @@ -19,7 +14,7 @@ from pathlib import Path from db import Database from db.users import (init_users_table, get_priority_users, save_priority_match, - get_priority_user_matches, discover_host_user) + get_priority_user_matches, discover_host_user, mark_match_viewed) from scoutd import scrape_github, scrape_reddit, scrape_mastodon, scrape_lobsters, scrape_lemmy, scrape_discord from scoutd.forges import scrape_all_forges from config import HOST_USER @@ -34,32 +29,41 @@ from introd.send import send_email from introd.deliver import deliver_intro, determine_best_contact from config import get_lost_config from api import start_api_thread, update_daemon_state +from central_client import CentralClient, get_client + + +class DummyDb: + """dummy db that does nothing - scrapers save here but we push to central""" + def save_human(self, human): pass + def save_match(self, *args, **kwargs): pass + def get_human(self, *args, **kwargs): return None + def close(self): pass + # daemon config SCOUT_INTERVAL = 3600 * 4 # full scout every 4 hours MATCH_INTERVAL = 3600 # check matches every hour INTRO_INTERVAL = 3600 * 2 # send intros every 2 hours -LOST_INTERVAL = 3600 * 6 # lost builder outreach every 6 hours (lower volume) +LOST_INTERVAL = 3600 * 6 # lost builder outreach every 6 hours from config import MAX_INTROS_PER_DAY -# central coordination (optional - for distributed instances) -try: - from central_client import CentralClient - CENTRAL_ENABLED = bool(os.environ.get('CONNECTD_API_KEY')) -except ImportError: - CENTRAL_ENABLED = False - CentralClient = None # from config.py -MIN_OVERLAP_PRIORITY = 30 # min score for priority user matches -MIN_OVERLAP_STRANGERS = 50 # higher bar for stranger intros +MIN_OVERLAP_PRIORITY = 30 +MIN_OVERLAP_STRANGERS = 50 class ConnectDaemon: def __init__(self, dry_run=False): - self.db = Database() - init_users_table(self.db.conn) - purged = self.db.purge_disqualified() - if any(purged.values()): - self.log(f"purged disqualified: {purged}") + # local db only for priority_users (host-specific) + self.local_db = Database() + init_users_table(self.local_db.conn) + + # CENTRAL for all humans/matches + self.central = get_client() + if not self.central: + raise RuntimeError("CENTRAL API REQUIRED - set CONNECTD_API_KEY and CONNECTD_CENTRAL_API") + + self.log("connected to CENTRAL database") + self.running = True self.dry_run = dry_run self.started_at = datetime.now() @@ -69,30 +73,19 @@ class ConnectDaemon: self.last_lost = None self.intros_today = 0 self.lost_intros_today = 0 - - # central coordination - self.central = None - if CENTRAL_ENABLED: - try: - self.central = CentralClient() - instance_id = os.environ.get('CONNECTD_INSTANCE_ID', 'unknown') - self.central.register_instance(instance_id, os.environ.get('CONNECTD_INSTANCE_IP', 'unknown')) - self.log(f"connected to central API as {instance_id}") - except Exception as e: - self.log(f"central API unavailable: {e}") - self.central = None self.today = datetime.now().date() - # handle shutdown gracefully + # register instance + instance_id = os.environ.get('CONNECTD_INSTANCE_ID', 'daemon') + self.central.register_instance(instance_id, os.environ.get('CONNECTD_INSTANCE_IP', 'unknown')) + signal.signal(signal.SIGINT, self._shutdown) signal.signal(signal.SIGTERM, self._shutdown) - # auto-discover host user from env if HOST_USER: self.log(f"HOST_USER set: {HOST_USER}") - discover_host_user(self.db.conn, HOST_USER) + discover_host_user(self.local_db.conn, HOST_USER) - # update API state self._update_api_state() def _shutdown(self, signum, frame): @@ -101,10 +94,8 @@ class ConnectDaemon: self._update_api_state() def _update_api_state(self): - """update API state for HA integration""" now = datetime.now() - # calculate countdowns - if no cycle has run, use started_at def secs_until(last, interval): base = last if last else self.started_at next_run = base + timedelta(seconds=interval) @@ -128,123 +119,116 @@ class ConnectDaemon: }) def log(self, msg): - """timestamped log""" print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {msg}") def reset_daily_limits(self): - """reset daily intro count""" if datetime.now().date() != self.today: self.today = datetime.now().date() self.intros_today = 0 self.lost_intros_today = 0 - - # central coordination - self.central = None - if CENTRAL_ENABLED: - try: - self.central = CentralClient() - instance_id = os.environ.get('CONNECTD_INSTANCE_ID', 'unknown') - self.central.register_instance(instance_id, os.environ.get('CONNECTD_INSTANCE_IP', 'unknown')) - self.log(f"connected to central API as {instance_id}") - except Exception as e: - self.log(f"central API unavailable: {e}") - self.central = None self.log("reset daily intro limits") def scout_cycle(self): - """run discovery on all platforms""" - self.log("starting scout cycle...") + """run discovery - scrape to CENTRAL""" + self.log("starting scout cycle (-> CENTRAL)...") + + # dummy db - scrapers save here but we push to central + dummy_db = DummyDb() + scraped_humans = [] try: - scrape_github(self.db, limit_per_source=30) + # github - returns list of humans + from scoutd.github import scrape_github + gh_humans = scrape_github(dummy_db, limit_per_source=30) + if gh_humans: + scraped_humans.extend(gh_humans) + self.log(f" github: {len(gh_humans) if gh_humans else 0} humans") except Exception as e: self.log(f"github scout error: {e}") try: - scrape_reddit(self.db, limit_per_sub=30) + from scoutd.reddit import scrape_reddit + reddit_humans = scrape_reddit(dummy_db, limit_per_sub=30) + if reddit_humans: + scraped_humans.extend(reddit_humans) + self.log(f" reddit: {len(reddit_humans) if reddit_humans else 0} humans") except Exception as e: self.log(f"reddit scout error: {e}") try: - scrape_mastodon(self.db, limit_per_instance=30) - - # scrape self-hosted git forges (highest signal) - self.log("scraping self-hosted git forges...") - try: - forge_humans = scrape_all_forges(limit_per_instance=30) - for h in forge_humans: - self.db.upsert_human(h) - self.log(f" forges: {len(forge_humans)} humans") - except Exception as e: - self.log(f" forge scrape error: {e}") + from scoutd.mastodon import scrape_mastodon + masto_humans = scrape_mastodon(dummy_db, limit_per_instance=30) + if masto_humans: + scraped_humans.extend(masto_humans) + self.log(f" mastodon: {len(masto_humans) if masto_humans else 0} humans") except Exception as e: self.log(f"mastodon scout error: {e}") try: - scrape_lobsters(self.db) + forge_humans = scrape_all_forges(limit_per_instance=30) + if forge_humans: + scraped_humans.extend(forge_humans) + self.log(f" forges: {len(forge_humans) if forge_humans else 0} humans") + except Exception as e: + self.log(f"forge scout error: {e}") + + try: + from scoutd.lobsters import scrape_lobsters + lob_humans = scrape_lobsters(dummy_db) + if lob_humans: + scraped_humans.extend(lob_humans) + self.log(f" lobsters: {len(lob_humans) if lob_humans else 0} humans") except Exception as e: self.log(f"lobsters scout error: {e}") - try: - scrape_lemmy(self.db, limit_per_community=30) - except Exception as e: - self.log(f"lemmy scout error: {e}") - - try: - scrape_discord(self.db, limit_per_channel=50) - except Exception as e: - self.log(f"discord scout error: {e}") + # push all to central + if scraped_humans: + self.log(f"pushing {len(scraped_humans)} humans to CENTRAL...") + try: + created, updated = self.central.upsert_humans_bulk(scraped_humans) + self.log(f" central: {created} created, {updated} updated") + except Exception as e: + self.log(f" central push error: {e}") self.last_scout = datetime.now() - stats = self.db.stats() - self.log(f"scout complete: {stats['total_humans']} humans in db") + stats = self.central.get_stats() + self.log(f"scout complete: {stats.get('total_humans', 0)} humans in CENTRAL") def match_priority_users(self): - """find matches for priority users (hosts)""" - priority_users = get_priority_users(self.db.conn) + """find matches for priority users (hosts) using CENTRAL data""" + priority_users = get_priority_users(self.local_db.conn) if not priority_users: return - self.log(f"matching for {len(priority_users)} priority users...") + self.log(f"matching for {len(priority_users)} priority users (from CENTRAL)...") - humans = self.db.get_all_humans(min_score=20) + # get humans from CENTRAL + humans = self.central.get_all_humans(min_score=20) for puser in priority_users: - # build priority user's fingerprint from their linked profiles + # use stored signals first (from discovery/scoring) puser_signals = [] - puser_text = [] + if puser.get('signals'): + stored = puser['signals'] + if isinstance(stored, str): + try: + stored = json.loads(stored) + except: + stored = [] + puser_signals.extend(stored) - if puser.get('bio'): - puser_text.append(puser['bio']) - if puser.get('interests'): + # supplement with interests if no signals stored + if not puser_signals and puser.get('interests'): interests = json.loads(puser['interests']) if isinstance(puser['interests'], str) else puser['interests'] puser_signals.extend(interests) - if puser.get('looking_for'): - puser_text.append(puser['looking_for']) - # analyze their linked github if available - if puser.get('github'): - gh_user = analyze_github_user(puser['github']) - if gh_user: - puser_signals.extend(gh_user.get('signals', [])) + if not puser_signals: + self.log(f" skipping {puser.get('name')} - no signals") + continue - puser_fingerprint = { - 'values_vector': {}, - 'skills': {}, - 'interests': list(set(puser_signals)), - 'location_pref': 'pnw' if puser.get('location') and 'seattle' in puser['location'].lower() else None, - } - - # score text - if puser_text: - _, text_signals, _ = analyze_text(' '.join(puser_text)) - puser_signals.extend(text_signals) - - # find matches matches_found = 0 for human in humans: - # skip if it's their own profile on another platform human_user = human.get('username', '').lower() if puser.get('github') and human_user == puser['github'].lower(): continue @@ -253,17 +237,18 @@ class ConnectDaemon: if puser.get('mastodon') and human_user == puser['mastodon'].lower().split('@')[0]: continue - # calculate overlap human_signals = human.get('signals', []) if isinstance(human_signals, str): - human_signals = json.loads(human_signals) + try: + human_signals = json.loads(human_signals) + except: + human_signals = [] shared = set(puser_signals) & set(human_signals) overlap_score = len(shared) * 10 - # location bonus if puser.get('location') and human.get('location'): - if 'seattle' in human['location'].lower() or 'pnw' in human['location'].lower(): + if 'seattle' in str(human.get('location', '')).lower() or 'pnw' in str(human.get('location', '')).lower(): overlap_score += 20 if overlap_score >= MIN_OVERLAP_PRIORITY: @@ -271,33 +256,31 @@ class ConnectDaemon: 'overlap_score': overlap_score, 'overlap_reasons': [f"shared: {', '.join(list(shared)[:5])}"] if shared else [], } - save_priority_match(self.db.conn, puser['id'], human['id'], overlap_data) + save_priority_match(self.local_db.conn, puser['id'], human['id'], overlap_data) matches_found += 1 if matches_found: self.log(f" found {matches_found} matches for {puser['name'] or puser['email']}") def match_strangers(self): - """find matches between discovered humans (altruistic)""" - self.log("matching strangers...") + """find matches between discovered humans - save to CENTRAL""" + self.log("matching strangers (-> CENTRAL)...") - humans = self.db.get_all_humans(min_score=40) + humans = self.central.get_all_humans(min_score=40) if len(humans) < 2: return - # generate fingerprints fingerprints = {} for human in humans: fp = generate_fingerprint(human) fingerprints[human['id']] = fp - # find pairs matches_found = 0 + new_matches = [] from itertools import combinations for human_a, human_b in combinations(humans, 2): - # skip same platform same user if human_a['platform'] == human_b['platform']: if human_a['username'] == human_b['username']: continue @@ -308,71 +291,35 @@ class ConnectDaemon: overlap = find_overlap(human_a, human_b, fp_a, fp_b) if overlap and overlap["overlap_score"] >= MIN_OVERLAP_STRANGERS: - # save match - self.db.save_match(human_a['id'], human_b['id'], overlap) + new_matches.append({ + 'human_a_id': human_a['id'], + 'human_b_id': human_b['id'], + 'overlap_score': overlap['overlap_score'], + 'overlap_reasons': json.dumps(overlap.get('overlap_reasons', [])) + }) matches_found += 1 - if matches_found: - self.log(f"found {matches_found} stranger matches") + # bulk push to central + if new_matches: + self.log(f"pushing {len(new_matches)} matches to CENTRAL...") + try: + created = self.central.create_matches_bulk(new_matches) + self.log(f" central: {created} matches created") + except Exception as e: + self.log(f" central push error: {e}") self.last_match = datetime.now() - def claim_from_central(self, human_id, match_id=None, outreach_type='intro'): - """claim outreach from central - returns outreach_id or None if already claimed""" - if not self.central: - return -1 # local mode, always allow - try: - return self.central.claim_outreach(human_id, match_id, outreach_type) - except Exception as e: - self.log(f"central claim error: {e}") - return -1 # allow local if central fails - - def complete_on_central(self, outreach_id, status, sent_via=None, draft=None, error=None): - """mark outreach complete on central""" - if not self.central or outreach_id == -1: - return - try: - self.central.complete_outreach(outreach_id, status, sent_via, draft, error) - except Exception as e: - self.log(f"central complete error: {e}") - - def sync_to_central(self, humans=None, matches=None): - """sync local data to central""" - if not self.central: - return - try: - if humans: - self.central.upsert_humans_bulk(humans) - if matches: - self.central.create_matches_bulk(matches) - except Exception as e: - self.log(f"central sync error: {e}") - def send_stranger_intros(self): - """send intros to connect strangers (or preview in dry-run mode)""" + """send intros using CENTRAL data""" self.reset_daily_limits() if not self.dry_run and self.intros_today >= MAX_INTROS_PER_DAY: self.log("daily intro limit reached") return - # get unsent matches - c = self.db.conn.cursor() - c.execute('''SELECT m.*, - ha.id as a_id, ha.username as a_user, ha.platform as a_platform, - ha.name as a_name, ha.url as a_url, ha.contact as a_contact, - ha.signals as a_signals, ha.extra as a_extra, - hb.id as b_id, hb.username as b_user, hb.platform as b_platform, - hb.name as b_name, hb.url as b_url, hb.contact as b_contact, - hb.signals as b_signals, hb.extra as b_extra - FROM matches m - JOIN humans ha ON m.human_a_id = ha.id - JOIN humans hb ON m.human_b_id = hb.id - WHERE m.status = 'pending' - ORDER BY m.overlap_score DESC - LIMIT 10''') - - matches = c.fetchall() + # get pending matches from CENTRAL + matches = self.central.get_matches(min_score=MIN_OVERLAP_STRANGERS, limit=20) if self.dry_run: self.log(f"DRY RUN: previewing {len(matches)} potential intros") @@ -381,59 +328,60 @@ class ConnectDaemon: if not self.dry_run and self.intros_today >= MAX_INTROS_PER_DAY: break - match = dict(match) + # get full human data + human_a = self.central.get_human(match['human_a_id']) + human_b = self.central.get_human(match['human_b_id']) - # build human dicts - human_a = { - 'id': match['a_id'], - 'username': match['a_user'], - 'platform': match['a_platform'], - 'name': match['a_name'], - 'url': match['a_url'], - 'contact': match['a_contact'], - 'signals': match['a_signals'], - 'extra': match['a_extra'], - } - human_b = { - 'id': match['b_id'], - 'username': match['b_user'], - 'platform': match['b_platform'], - 'name': match['b_name'], - 'url': match['b_url'], - 'contact': match['b_contact'], - 'signals': match['b_signals'], - 'extra': match['b_extra'], - } + if not human_a or not human_b: + continue match_data = { 'id': match['id'], 'human_a': human_a, 'human_b': human_b, 'overlap_score': match['overlap_score'], - 'overlap_reasons': match['overlap_reasons'], + 'overlap_reasons': match.get('overlap_reasons', ''), } - # try to send intro to person with email for recipient, other in [(human_a, human_b), (human_b, human_a)]: contact = recipient.get('contact', {}) if isinstance(contact, str): - contact = json.loads(contact) + try: + contact = json.loads(contact) + except: + contact = {} email = contact.get('email') if not email: continue - # draft intro - intro = draft_intro(match_data, recipient='a' if recipient == human_a else 'b') + # check if already contacted + if self.central.already_contacted(recipient['id']): + continue - # parse overlap reasons for display - reasons = match['overlap_reasons'] + # get token and interest count for recipient + try: + recipient_token = self.central.get_token(recipient['id'], match.get('id')) + interested_count = self.central.get_interested_count(recipient['id']) + except Exception as e: + print(f"[intro] failed to get token/count: {e}") + recipient_token = None + interested_count = 0 + + intro = draft_intro(match_data, + recipient='a' if recipient == human_a else 'b', + recipient_token=recipient_token, + interested_count=interested_count) + + reasons = match.get('overlap_reasons', '') if isinstance(reasons, str): - reasons = json.loads(reasons) + try: + reasons = json.loads(reasons) + except: + reasons = [] reason_summary = ', '.join(reasons[:3]) if reasons else 'aligned values' if self.dry_run: - # print preview print("\n" + "=" * 60) print(f"TO: {recipient['username']} ({recipient['platform']})") print(f"EMAIL: {email}") @@ -447,13 +395,11 @@ class ConnectDaemon: print("=" * 60) break else: - # claim from central first - outreach_id = self.claim_from_central(recipient['id'], match['id'], 'intro') + outreach_id = self.central.claim_outreach(recipient['id'], match['id'], 'intro') if outreach_id is None: - self.log(f"skipping {recipient['username']} - already claimed by another instance") + self.log(f"skipping {recipient['username']} - already claimed") continue - # actually send success, error = send_email( email, f"connectd: you might want to meet {other['username']}", @@ -463,24 +409,124 @@ class ConnectDaemon: if success: self.log(f"sent intro to {recipient['username']} ({email})") self.intros_today += 1 - self.complete_on_central(outreach_id, 'sent', 'email', intro['draft']) - - # mark match as intro_sent - c.execute('UPDATE matches SET status = "intro_sent" WHERE id = ?', - (match['id'],)) - self.db.conn.commit() + self.central.complete_outreach(outreach_id, 'sent', 'email', intro['draft']) break else: self.log(f"failed to send to {email}: {error}") - self.complete_on_central(outreach_id, 'failed', error=error) + self.central.complete_outreach(outreach_id, 'failed', error=error) self.last_intro = datetime.now() + def send_priority_user_intros(self): + """send intros TO priority users (hosts) about their matches""" + self.reset_daily_limits() + + priority_users = get_priority_users(self.local_db.conn) + if not priority_users: + return + + self.log(f"checking intros for {len(priority_users)} priority users...") + + for puser in priority_users: + if not self.dry_run and self.intros_today >= MAX_INTROS_PER_DAY: + break + + # get email + email = puser.get('email') + if not email: + continue + + # get their matches from local priority_matches table + matches = get_priority_user_matches(self.local_db.conn, puser['id'], status='new', limit=5) + + if not matches: + continue + + for match in matches: + if not self.dry_run and self.intros_today >= MAX_INTROS_PER_DAY: + break + + # get the matched human from CENTRAL (matched_human_id is central id) + human_id = match.get('matched_human_id') + if not human_id: + continue + + human = self.central.get_human(human_id) + if not human: + continue + + # build match data for drafting + overlap_reasons = match.get('overlap_reasons', '[]') + if isinstance(overlap_reasons, str): + try: + overlap_reasons = json.loads(overlap_reasons) + except: + overlap_reasons = [] + + puser_name = puser.get('name') or puser.get('email', '').split('@')[0] + human_name = human.get('name') or human.get('username') + + # draft intro TO priority user ABOUT the matched human + match_data = { + 'id': match.get('id'), + 'human_a': { + 'username': puser_name, + 'platform': 'host', + 'name': puser_name, + 'bio': puser.get('bio', ''), + 'signals': puser.get('signals', []), + }, + 'human_b': human, + 'overlap_score': match.get('overlap_score', 0), + 'overlap_reasons': overlap_reasons, + } + + # try to get token for priority user (they might have a central ID) + recipient_token = None + interested_count = 0 + if puser.get('central_id'): + try: + recipient_token = self.central.get_token(puser['central_id'], match.get('id')) + interested_count = self.central.get_interested_count(puser['central_id']) + except: + pass + + intro = draft_intro(match_data, recipient='a', + recipient_token=recipient_token, + interested_count=interested_count) + + reason_summary = ', '.join(overlap_reasons[:3]) if overlap_reasons else 'aligned values' + + if self.dry_run: + print("\n" + "=" * 60) + print("PRIORITY USER INTRO") + print("=" * 60) + print(f"TO: {puser_name} ({email})") + print(f"ABOUT: {human_name} ({human.get('platform')})") + print(f"SCORE: {match.get('overlap_score', 0):.0f} ({reason_summary})") + print("-" * 60) + print("MESSAGE:") + print(intro['draft']) + print("-" * 60) + print("[DRY RUN - NOT SENT]") + print("=" * 60) + else: + success, error = send_email( + email, + f"connectd: you might want to meet {human_name}", + intro['draft'] + ) + + if success: + self.log(f"sent priority intro to {puser_name} about {human_name}") + self.intros_today += 1 + # mark match as notified + mark_match_viewed(self.local_db.conn, match['id']) + else: + self.log(f"failed to send priority intro to {email}: {error}") + def send_lost_builder_intros(self): - """ - reach out to lost builders - different tone, lower volume. - these people need encouragement, not networking. - """ + """reach out to lost builders using CENTRAL data""" self.reset_daily_limits() lost_config = get_lost_config() @@ -493,43 +539,60 @@ class ConnectDaemon: self.log("daily lost builder intro limit reached") return - # find lost builders with matching active builders - matches, error = find_matches_for_lost_builders( - self.db, - min_lost_score=lost_config.get('min_lost_score', 40), - min_values_score=lost_config.get('min_values_score', 20), + # get lost builders from CENTRAL + lost_builders = self.central.get_lost_builders( + min_score=lost_config.get('min_lost_score', 40), limit=max_per_day - self.lost_intros_today ) - if error: - self.log(f"lost builder matching error: {error}") - return + # get active builders from CENTRAL + builders = self.central.get_builders(min_score=50, limit=100) - if not matches: - self.log("no lost builders ready for outreach") + if not lost_builders or not builders: + self.log("no lost builders or builders available") return if self.dry_run: - self.log(f"DRY RUN: previewing {len(matches)} lost builder intros") + self.log(f"DRY RUN: previewing {len(lost_builders)} lost builder intros") - for match in matches: + for lost in lost_builders: if not self.dry_run and self.lost_intros_today >= max_per_day: break - lost = match['lost_user'] - builder = match['inspiring_builder'] + # find matching builder + best_builder = None + best_score = 0 + for builder in builders: + lost_signals = lost.get('signals', []) + builder_signals = builder.get('signals', []) + if isinstance(lost_signals, str): + try: + lost_signals = json.loads(lost_signals) + except: + lost_signals = [] + if isinstance(builder_signals, str): + try: + builder_signals = json.loads(builder_signals) + except: + builder_signals = [] + + shared = set(lost_signals) & set(builder_signals) + if len(shared) > best_score: + best_score = len(shared) + best_builder = builder + + if not best_builder: + continue lost_name = lost.get('name') or lost.get('username') - builder_name = builder.get('name') or builder.get('username') + builder_name = best_builder.get('name') or best_builder.get('username') - # draft intro - draft, draft_error = draft_lost_intro(lost, builder, lost_config) + draft, draft_error = draft_lost_intro(lost, best_builder, lost_config) if draft_error: self.log(f"error drafting lost intro for {lost_name}: {draft_error}") continue - # determine best contact method (activity-based) method, contact_info = determine_best_contact(lost) if self.dry_run: @@ -539,9 +602,7 @@ class ConnectDaemon: print(f"TO: {lost_name} ({lost.get('platform')})") print(f"DELIVERY: {method} โ {contact_info}") print(f"LOST SCORE: {lost.get('lost_potential_score', 0)}") - print(f"VALUES SCORE: {lost.get('score', 0)}") print(f"INSPIRING BUILDER: {builder_name}") - print(f"SHARED INTERESTS: {', '.join(match.get('shared_interests', []))}") print("-" * 60) print("MESSAGE:") print(draft) @@ -549,12 +610,11 @@ class ConnectDaemon: print("[DRY RUN - NOT SENT]") print("=" * 60) else: - # build match data for unified delivery match_data = { - 'human_a': builder, # inspiring builder - 'human_b': lost, # lost builder (recipient) - 'overlap_score': match.get('match_score', 0), - 'overlap_reasons': match.get('shared_interests', []), + 'human_a': best_builder, + 'human_b': lost, + 'overlap_score': best_score * 10, + 'overlap_reasons': [], } success, error, delivery_method = deliver_intro(match_data, draft) @@ -562,7 +622,6 @@ class ConnectDaemon: if success: self.log(f"sent lost builder intro to {lost_name} via {delivery_method}") self.lost_intros_today += 1 - self.db.mark_lost_outreach(lost['id']) else: self.log(f"failed to reach {lost_name} via {delivery_method}: {error}") @@ -571,9 +630,8 @@ class ConnectDaemon: def run(self): """main daemon loop""" - self.log("connectd daemon starting...") + self.log("connectd daemon starting (CENTRAL MODE)...") - # start API server start_api_thread() self.log("api server started on port 8099") @@ -592,36 +650,31 @@ class ConnectDaemon: while self.running: now = datetime.now() - # scout cycle if not self.last_scout or (now - self.last_scout).seconds >= SCOUT_INTERVAL: self.scout_cycle() self._update_api_state() - # match cycle if not self.last_match or (now - self.last_match).seconds >= MATCH_INTERVAL: self.match_priority_users() self.match_strangers() self._update_api_state() - # intro cycle if not self.last_intro or (now - self.last_intro).seconds >= INTRO_INTERVAL: self.send_stranger_intros() + self.send_priority_user_intros() self._update_api_state() - # lost builder cycle if not self.last_lost or (now - self.last_lost).seconds >= LOST_INTERVAL: self.send_lost_builder_intros() self._update_api_state() - # sleep between checks time.sleep(60) self.log("connectd daemon stopped") - self.db.close() + self.local_db.close() def run_daemon(dry_run=False): - """entry point""" daemon = ConnectDaemon(dry_run=dry_run) daemon.run() diff --git a/db/users.py b/db/users.py index 0615389..95d4d97 100644 --- a/db/users.py +++ b/db/users.py @@ -139,20 +139,18 @@ def save_priority_match(conn, priority_user_id, human_id, overlap_data): def get_priority_user_matches(conn, priority_user_id, status=None, limit=50): - """get matches for a priority user""" + """get matches for a priority user (humans fetched from CENTRAL separately)""" c = conn.cursor() if status: - c.execute('''SELECT pm.*, h.* FROM priority_matches pm - JOIN humans h ON pm.matched_human_id = h.id - WHERE pm.priority_user_id = ? AND pm.status = ? - ORDER BY pm.overlap_score DESC + c.execute('''SELECT * FROM priority_matches + WHERE priority_user_id = ? AND status = ? + ORDER BY overlap_score DESC LIMIT ?''', (priority_user_id, status, limit)) else: - c.execute('''SELECT pm.*, h.* FROM priority_matches pm - JOIN humans h ON pm.matched_human_id = h.id - WHERE pm.priority_user_id = ? - ORDER BY pm.overlap_score DESC + c.execute('''SELECT * FROM priority_matches + WHERE priority_user_id = ? + ORDER BY overlap_score DESC LIMIT ?''', (priority_user_id, limit)) return [dict(row) for row in c.fetchall()] diff --git a/introd/draft.py b/introd/draft.py index 3cbf160..5e3cd71 100644 --- a/introd/draft.py +++ b/introd/draft.py @@ -1,10 +1,14 @@ """ introd/draft.py - AI writes intro messages referencing both parties' work +now with interest system links """ import json -# intro template - transparent about being AI, neutral third party +# base URL for connectd profiles +CONNECTD_URL = "https://connectd.sudoxreboot.com" + +# intro template - now with interest links INTRO_TEMPLATE = """hi {recipient_name}, i'm an AI that connects isolated builders working on similar things. @@ -17,7 +21,8 @@ overlap: {overlap_summary} thought you might benefit from knowing each other. -their work: {other_url} +their profile: {profile_url} +{interested_line} no pitch. just connection. ignore if not useful. @@ -32,7 +37,7 @@ you: {recipient_summary} overlap: {overlap_summary} -their work: {other_url} +their profile: {profile_url} no pitch, just connection. """ @@ -51,12 +56,18 @@ def summarize_human(human_data): # signals/interests signals = human_data.get('signals', []) if isinstance(signals, str): - signals = json.loads(signals) + try: + signals = json.loads(signals) + except: + signals = [] # extra data extra = human_data.get('extra', {}) if isinstance(extra, str): - extra = json.loads(extra) + try: + extra = json.loads(extra) + except: + extra = {} # build summary based on available data topics = extra.get('topics', []) @@ -103,7 +114,10 @@ def summarize_overlap(overlap_data): """generate overlap summary""" reasons = overlap_data.get('overlap_reasons', []) if isinstance(reasons, str): - reasons = json.loads(reasons) + try: + reasons = json.loads(reasons) + except: + reasons = [] if reasons: return ' | '.join(reasons[:3]) @@ -116,12 +130,14 @@ def summarize_overlap(overlap_data): return "aligned values and interests" -def draft_intro(match_data, recipient='a'): +def draft_intro(match_data, recipient='a', recipient_token=None, interested_count=0): """ draft an intro message for a match match_data: dict with human_a, human_b, overlap info recipient: 'a' or 'b' - who receives this intro + recipient_token: token for the recipient (to track who clicked) + interested_count: how many people are already interested in the recipient returns: dict with draft text, channel, metadata """ @@ -135,19 +151,37 @@ def draft_intro(match_data, recipient='a'): # get names recipient_name = recipient_human.get('name') or recipient_human.get('username', 'friend') other_name = other_human.get('name') or other_human.get('username', 'someone') + other_username = other_human.get('username', '') # generate summaries recipient_summary = summarize_human(recipient_human) other_summary = summarize_human(other_human) overlap_summary = summarize_overlap(match_data) - # other's url - other_url = other_human.get('url', '') + # build profile URL with token if available + if other_username: + profile_url = f"{CONNECTD_URL}/{other_username}" + if recipient_token: + profile_url += f"?t={recipient_token}" + else: + profile_url = other_human.get('url', '') + + # interested line - tells them about their inbox + interested_line = '' + if recipient_token: + interested_url = f"{CONNECTD_URL}/interested/{recipient_token}" + if interested_count > 0: + interested_line = f"\n{interested_count} people already want to meet you: {interested_url}" + else: + interested_line = f"\nbe the first to connect: {interested_url}" # determine best channel contact = recipient_human.get('contact', {}) if isinstance(contact, str): - contact = json.loads(contact) + try: + contact = json.loads(contact) + except: + contact = {} channel = None channel_address = None @@ -156,15 +190,12 @@ def draft_intro(match_data, recipient='a'): if contact.get('email'): channel = 'email' channel_address = contact['email'] - # github issue/discussion elif recipient_human.get('platform') == 'github': channel = 'github' channel_address = recipient_human.get('url') - # mastodon DM elif recipient_human.get('platform') == 'mastodon': channel = 'mastodon' channel_address = recipient_human.get('username') - # reddit message elif recipient_human.get('platform') == 'reddit': channel = 'reddit' channel_address = recipient_human.get('username') @@ -180,12 +211,13 @@ def draft_intro(match_data, recipient='a'): # render draft draft = template.format( - recipient_name=recipient_name.split()[0] if recipient_name else 'friend', # first name only + recipient_name=recipient_name.split()[0] if recipient_name else 'friend', recipient_summary=recipient_summary, other_name=other_name.split()[0] if other_name else 'someone', other_summary=other_summary, overlap_summary=overlap_summary, - other_url=other_url, + profile_url=profile_url, + interested_line=interested_line, ) return { @@ -196,15 +228,16 @@ def draft_intro(match_data, recipient='a'): 'draft': draft, 'overlap_score': match_data.get('overlap_score', 0), 'match_id': match_data.get('id'), + 'recipient_token': recipient_token, } -def draft_intros_for_match(match_data): +def draft_intros_for_match(match_data, token_a=None, token_b=None, interested_a=0, interested_b=0): """ draft intros for both parties in a match returns list of two intro dicts """ - intro_a = draft_intro(match_data, recipient='a') - intro_b = draft_intro(match_data, recipient='b') + intro_a = draft_intro(match_data, recipient='a', recipient_token=token_a, interested_count=interested_a) + intro_b = draft_intro(match_data, recipient='b', recipient_token=token_b, interested_count=interested_b) return [intro_a, intro_b] diff --git a/introd/groq_draft.py b/introd/groq_draft.py index 6caa79a..2252007 100644 --- a/introd/groq_draft.py +++ b/introd/groq_draft.py @@ -71,7 +71,7 @@ email: connectd@sudoxreboot.com """ -def draft_intro_with_llm(match_data: dict, recipient: str = 'a', dry_run: bool = True): +def draft_intro_with_llm(match_data: dict, recipient: str = 'a', dry_run: bool = True, recipient_token: str = None, interested_count: int = 0): """ draft an intro message using groq llm. @@ -219,9 +219,45 @@ return ONLY the subject line.""" subject = subject_response.choices[0].message.content.strip().strip('"').strip("'") + # add profile link and interest section + profile_url = f"https://connectd.sudoxreboot.com/{about_name}" + if recipient_token: + profile_url += f"?t={recipient_token}" + + profile_section_html = f""" ++ public data is public. this is everything we've gathered from public sources. +
+{raw_json}
+