connectd/scoutd/lemmy.py
root 99946bfef5 autonomous daemon with platform-native contact detection
- determine_contact_method now recognizes mastodon/bluesky users by platform
- username IS the handle for platform-native users
- fixed orphaned matches table issue
- wave 1 intros sent successfully
2025-12-16 09:22:58 +00:00

322 lines
9.5 KiB
Python

"""
scoutd/lemmy.py - lemmy (fediverse reddit) discovery
lemmy is federated so we hit multiple instances.
great for finding lost builders in communities like:
- /c/programming, /c/technology, /c/linux
- /c/antiwork, /c/workreform (lost builders!)
- /c/selfhosted, /c/privacy, /c/opensource
supports authenticated access for private instances and DM delivery.
"""
import requests
import json
import time
import os
from datetime import datetime
from pathlib import Path
from .signals import analyze_text
from .lost import (
analyze_social_for_lost_signals,
analyze_text_for_lost_signals,
classify_user,
)
# auth config from environment
LEMMY_INSTANCE = os.environ.get('LEMMY_INSTANCE', '')
LEMMY_USERNAME = os.environ.get('LEMMY_USERNAME', '')
LEMMY_PASSWORD = os.environ.get('LEMMY_PASSWORD', '')
# auth token cache
_auth_token = None
# popular lemmy instances
LEMMY_INSTANCES = [
'lemmy.ml',
'lemmy.world',
'programming.dev',
'lemm.ee',
'sh.itjust.works',
]
# communities to scout (format: community@instance or just community for local)
TARGET_COMMUNITIES = [
# builder communities
'programming',
'selfhosted',
'linux',
'opensource',
'privacy',
'technology',
'webdev',
'rust',
'python',
'golang',
# lost builder communities (people struggling, stuck, seeking)
'antiwork',
'workreform',
'careerguidance',
'cscareerquestions',
'learnprogramming',
'findapath',
]
CACHE_DIR = Path(__file__).parent.parent / 'db' / 'cache' / 'lemmy'
CACHE_DIR.mkdir(parents=True, exist_ok=True)
def get_auth_token(instance=None):
"""get auth token for lemmy instance"""
global _auth_token
if _auth_token:
return _auth_token
instance = instance or LEMMY_INSTANCE
if not all([instance, LEMMY_USERNAME, LEMMY_PASSWORD]):
return None
try:
url = f"https://{instance}/api/v3/user/login"
resp = requests.post(url, json={
'username_or_email': LEMMY_USERNAME,
'password': LEMMY_PASSWORD,
}, timeout=30)
if resp.status_code == 200:
_auth_token = resp.json().get('jwt')
return _auth_token
return None
except Exception as e:
print(f"lemmy auth error: {e}")
return None
def send_lemmy_dm(recipient_username, message, dry_run=False):
"""send a private message via lemmy"""
if not LEMMY_INSTANCE:
return False, "LEMMY_INSTANCE not configured"
if dry_run:
print(f"[dry run] would send lemmy DM to {recipient_username}")
return True, None
token = get_auth_token()
if not token:
return False, "failed to authenticate with lemmy"
try:
# parse recipient - could be username@instance or just username
if '@' in recipient_username:
username, instance = recipient_username.split('@', 1)
else:
username = recipient_username
instance = LEMMY_INSTANCE
# get recipient user id
user_url = f"https://{LEMMY_INSTANCE}/api/v3/user"
resp = requests.get(user_url, params={'username': f"{username}@{instance}"}, timeout=30)
if resp.status_code != 200:
# try without instance suffix for local users
resp = requests.get(user_url, params={'username': username}, timeout=30)
if resp.status_code != 200:
return False, f"could not find user {recipient_username}"
recipient_id = resp.json().get('person_view', {}).get('person', {}).get('id')
if not recipient_id:
return False, "could not get recipient id"
# send DM
dm_url = f"https://{LEMMY_INSTANCE}/api/v3/private_message"
resp = requests.post(dm_url,
headers={'Authorization': f'Bearer {token}'},
json={
'content': message,
'recipient_id': recipient_id,
},
timeout=30
)
if resp.status_code == 200:
return True, None
else:
return False, f"lemmy DM error: {resp.status_code} - {resp.text}"
except Exception as e:
return False, f"lemmy DM error: {str(e)}"
def get_community_posts(instance, community, limit=50, sort='New'):
"""get posts from a lemmy community"""
try:
url = f"https://{instance}/api/v3/post/list"
params = {
'community_name': community,
'sort': sort,
'limit': limit,
}
resp = requests.get(url, params=params, timeout=30)
if resp.status_code == 200:
return resp.json().get('posts', [])
return []
except Exception as e:
return []
def get_user_profile(instance, username):
"""get lemmy user profile"""
try:
url = f"https://{instance}/api/v3/user"
params = {'username': username}
resp = requests.get(url, params=params, timeout=30)
if resp.status_code == 200:
return resp.json()
return None
except Exception:
return None
def analyze_lemmy_user(instance, username, posts=None):
"""analyze a lemmy user for values alignment and lost signals"""
profile = get_user_profile(instance, username)
if not profile:
return None
person = profile.get('person_view', {}).get('person', {})
counts = profile.get('person_view', {}).get('counts', {})
bio = person.get('bio', '') or ''
display_name = person.get('display_name') or person.get('name', username)
# analyze bio
bio_score, bio_signals, bio_reasons = analyze_text(bio)
# analyze posts if provided
post_signals = []
post_text = []
if posts:
for post in posts[:10]:
post_data = post.get('post', {})
title = post_data.get('name', '')
body = post_data.get('body', '')
post_text.append(f"{title} {body}")
_, signals, _ = analyze_text(f"{title} {body}")
post_signals.extend(signals)
all_signals = list(set(bio_signals + post_signals))
total_score = bio_score + len(post_signals) * 5
# lost builder detection
profile_for_lost = {
'bio': bio,
'post_count': counts.get('post_count', 0),
'comment_count': counts.get('comment_count', 0),
}
posts_for_lost = [{'text': t} for t in post_text]
lost_signals, lost_weight = analyze_social_for_lost_signals(profile_for_lost, posts_for_lost)
lost_potential_score = lost_weight
user_type = classify_user(lost_potential_score, 50, total_score)
return {
'platform': 'lemmy',
'username': f"{username}@{instance}",
'url': f"https://{instance}/u/{username}",
'name': display_name,
'bio': bio,
'location': None,
'score': total_score,
'confidence': min(0.9, 0.3 + len(all_signals) * 0.1),
'signals': all_signals,
'negative_signals': [],
'reasons': bio_reasons,
'contact': {},
'extra': {
'instance': instance,
'post_count': counts.get('post_count', 0),
'comment_count': counts.get('comment_count', 0),
},
'lost_potential_score': lost_potential_score,
'lost_signals': lost_signals,
'user_type': user_type,
}
def scrape_lemmy(db, limit_per_community=30):
"""scrape lemmy instances for aligned builders"""
print("scouting lemmy...")
found = 0
lost_found = 0
seen_users = set()
# build instance list - user's instance first if configured
instances = list(LEMMY_INSTANCES)
if LEMMY_INSTANCE and LEMMY_INSTANCE not in instances:
instances.insert(0, LEMMY_INSTANCE)
for instance in instances:
print(f" instance: {instance}")
for community in TARGET_COMMUNITIES:
posts = get_community_posts(instance, community, limit=limit_per_community)
if not posts:
continue
print(f" /c/{community}: {len(posts)} posts")
# group posts by user
user_posts = {}
for post in posts:
creator = post.get('creator', {})
username = creator.get('name')
if not username:
continue
user_key = f"{username}@{instance}"
if user_key in seen_users:
continue
if user_key not in user_posts:
user_posts[user_key] = []
user_posts[user_key].append(post)
# analyze each user
for user_key, posts in user_posts.items():
username = user_key.split('@')[0]
if user_key in seen_users:
continue
seen_users.add(user_key)
result = analyze_lemmy_user(instance, username, posts)
if not result:
continue
if result['score'] >= 20 or result.get('lost_potential_score', 0) >= 30:
db.save_human(result)
found += 1
if result.get('user_type') in ['lost', 'both']:
lost_found += 1
print(f" {result['username']}: {result['score']:.0f} (lost: {result['lost_potential_score']:.0f})")
elif result['score'] >= 40:
print(f" {result['username']}: {result['score']:.0f}")
time.sleep(0.5) # rate limit
time.sleep(1) # between communities
time.sleep(2) # between instances
print(f"lemmy: found {found} humans ({lost_found} lost builders)")
return found