implemented federation
This commit is contained in:
@@ -0,0 +1,84 @@
|
||||
package federation
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"crypto/rsa"
|
||||
"crypto/x509"
|
||||
"encoding/pem"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"xorm.io/xorm"
|
||||
|
||||
"github.com/forgeo/forgebucket/internal/models"
|
||||
)
|
||||
|
||||
// APID returns the canonical ActivityPub actor ID for a local username.
|
||||
func APID(instanceURL, username string) string {
|
||||
return strings.TrimRight(instanceURL, "/") + "/users/" + username
|
||||
}
|
||||
|
||||
// GetOrCreate fetches the FederationActor for a user, creating it with a fresh
|
||||
// RSA-2048 key pair if none exists. Actor URLs are derived from instanceURL.
|
||||
func GetOrCreate(db *xorm.Engine, userID int64, username, instanceURL string) (*models.FederationActor, error) {
|
||||
var actor models.FederationActor
|
||||
if found, _ := db.Where("user_id = ?", userID).Get(&actor); found {
|
||||
return &actor, nil
|
||||
}
|
||||
|
||||
priv, err := rsa.GenerateKey(rand.Reader, 2048)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("generate rsa key: %w", err)
|
||||
}
|
||||
|
||||
privPEM := pem.EncodeToMemory(&pem.Block{
|
||||
Type: "RSA PRIVATE KEY",
|
||||
Bytes: x509.MarshalPKCS1PrivateKey(priv),
|
||||
})
|
||||
pubDER, err := x509.MarshalPKIXPublicKey(&priv.PublicKey)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshal public key: %w", err)
|
||||
}
|
||||
pubPEM := pem.EncodeToMemory(&pem.Block{Type: "PUBLIC KEY", Bytes: pubDER})
|
||||
|
||||
base := APID(instanceURL, username)
|
||||
actor = models.FederationActor{
|
||||
UserID: userID,
|
||||
APID: base,
|
||||
InboxURL: base + "/inbox",
|
||||
OutboxURL: base + "/outbox",
|
||||
PublicKey: string(pubPEM),
|
||||
PrivateKey: string(privPEM),
|
||||
}
|
||||
if _, err := db.Insert(&actor); err != nil {
|
||||
// Race condition: another goroutine may have just created it.
|
||||
if found, _ := db.Where("user_id = ?", userID).Get(&actor); found {
|
||||
return &actor, nil
|
||||
}
|
||||
return nil, fmt.Errorf("insert actor: %w", err)
|
||||
}
|
||||
return &actor, nil
|
||||
}
|
||||
|
||||
// ActorJSON builds the JSON-LD actor document returned by GET /users/{username}.
|
||||
func ActorJSON(actor *models.FederationActor, username, displayName string) map[string]any {
|
||||
return map[string]any{
|
||||
"@context": []any{
|
||||
"https://www.w3.org/ns/activitystreams",
|
||||
"https://w3id.org/security/v1",
|
||||
},
|
||||
"id": actor.APID,
|
||||
"type": "Person",
|
||||
"preferredUsername": username,
|
||||
"name": displayName,
|
||||
"inbox": actor.InboxURL,
|
||||
"outbox": actor.OutboxURL,
|
||||
"followers": actor.APID + "/followers",
|
||||
"following": actor.APID + "/following",
|
||||
"publicKey": map[string]any{
|
||||
"id": actor.APID + "#main-key",
|
||||
"owner": actor.APID,
|
||||
"publicKeyPem": actor.PublicKey,
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,113 @@
|
||||
package federation
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"xorm.io/xorm"
|
||||
|
||||
"github.com/forgeo/forgebucket/internal/models"
|
||||
)
|
||||
|
||||
// Receive persists an inbound activity and dispatches it by type.
|
||||
// The caller is responsible for verifying the HTTP signature before calling this.
|
||||
func Receive(db *xorm.Engine, localActor *models.FederationActor, body []byte) error {
|
||||
var activity map[string]any
|
||||
if err := json.Unmarshal(body, &activity); err != nil {
|
||||
return fmt.Errorf("parse activity: %w", err)
|
||||
}
|
||||
|
||||
actType, _ := activity["type"].(string)
|
||||
actorAPID, _ := activity["actor"].(string)
|
||||
|
||||
entry := &models.FederationActivity{
|
||||
ActorAPID: localActor.APID,
|
||||
Type: actType,
|
||||
ObjectJSON: string(body),
|
||||
Direction: "inbound",
|
||||
RemoteActor: actorAPID,
|
||||
Published: time.Now().UTC(),
|
||||
}
|
||||
db.Insert(entry) //nolint:errcheck
|
||||
|
||||
switch actType {
|
||||
case "Follow":
|
||||
return handleFollow(db, localActor, activity, actorAPID)
|
||||
case "Accept":
|
||||
handleAccept(db, localActor, activity)
|
||||
case "Undo":
|
||||
handleUndo(db, localActor, activity)
|
||||
default:
|
||||
log.Printf("federation: received unhandled activity type %q from %s", actType, actorAPID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleFollow auto-accepts all incoming Follow activities and sends an Accept
|
||||
// back to the sender's inbox.
|
||||
func handleFollow(db *xorm.Engine, localActor *models.FederationActor, follow map[string]any, followerAPID string) error {
|
||||
if followerAPID == "" {
|
||||
return fmt.Errorf("Follow activity missing actor field")
|
||||
}
|
||||
|
||||
// Fetch the follower's remote actor to get their inbox URL.
|
||||
remote, err := FetchActor(db, followerAPID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("fetch follower actor: %w", err)
|
||||
}
|
||||
if remote.InboxURL == "" {
|
||||
return fmt.Errorf("follower has no inbox URL")
|
||||
}
|
||||
|
||||
// Build Accept activity.
|
||||
accept := map[string]any{
|
||||
"@context": "https://www.w3.org/ns/activitystreams",
|
||||
"id": localActor.APID + "/activities/accept-" + fmt.Sprint(time.Now().UnixNano()),
|
||||
"type": "Accept",
|
||||
"actor": localActor.APID,
|
||||
"object": follow,
|
||||
}
|
||||
|
||||
// Deliver asynchronously so inbox handler returns quickly.
|
||||
go func() {
|
||||
if err := DeliverActivity(localActor, accept, remote.InboxURL); err != nil {
|
||||
log.Printf("federation: deliver Accept to %s: %v", remote.InboxURL, err)
|
||||
return
|
||||
}
|
||||
// Store the outbound Accept.
|
||||
db.Insert(&models.FederationActivity{ //nolint:errcheck
|
||||
ActorAPID: localActor.APID,
|
||||
Type: "Accept",
|
||||
ObjectJSON: mustJSON(accept),
|
||||
Direction: "outbound",
|
||||
RemoteActor: followerAPID,
|
||||
Published: time.Now().UTC(),
|
||||
})
|
||||
log.Printf("federation: accepted Follow from %s", followerAPID)
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func handleAccept(db *xorm.Engine, localActor *models.FederationActor, activity map[string]any) {
|
||||
// A remote actor accepted our Follow. Nothing to store beyond the inbox entry.
|
||||
log.Printf("federation: received Accept for actor %s", localActor.APID)
|
||||
}
|
||||
|
||||
func handleUndo(db *xorm.Engine, localActor *models.FederationActor, activity map[string]any) {
|
||||
// Common case: undo a Follow (unfollow).
|
||||
obj, _ := activity["object"].(map[string]any)
|
||||
if obj == nil {
|
||||
return
|
||||
}
|
||||
if t, _ := obj["type"].(string); t == "Follow" {
|
||||
log.Printf("federation: received Undo(Follow) for actor %s", localActor.APID)
|
||||
}
|
||||
}
|
||||
|
||||
func mustJSON(v any) string {
|
||||
b, _ := json.Marshal(v)
|
||||
return string(b)
|
||||
}
|
||||
@@ -0,0 +1,84 @@
|
||||
package federation
|
||||
|
||||
import (
|
||||
"xorm.io/xorm"
|
||||
|
||||
"github.com/forgeo/forgebucket/internal/models"
|
||||
)
|
||||
|
||||
const activitiesPerPage = 20
|
||||
|
||||
// Collection builds an ActivityStreams OrderedCollection (or page) for an actor's outbox.
|
||||
// page=0 returns the collection summary; page≥1 returns a paginated OrderedCollectionPage.
|
||||
func Collection(db *xorm.Engine, actorAPID string, outboxURL string, page int) map[string]any {
|
||||
total, _ := db.Where("actor_ap_id = ? AND direction = 'outbound'", actorAPID).
|
||||
Count(&models.FederationActivity{})
|
||||
|
||||
if page == 0 {
|
||||
return map[string]any{
|
||||
"@context": "https://www.w3.org/ns/activitystreams",
|
||||
"id": outboxURL,
|
||||
"type": "OrderedCollection",
|
||||
"totalItems": total,
|
||||
"first": outboxURL + "?page=1",
|
||||
}
|
||||
}
|
||||
|
||||
offset := (page - 1) * activitiesPerPage
|
||||
var activities []models.FederationActivity
|
||||
db.Where("actor_ap_id = ? AND direction = 'outbound'", actorAPID).
|
||||
Desc("published").
|
||||
Limit(activitiesPerPage, offset).
|
||||
Find(&activities)
|
||||
|
||||
items := make([]any, 0, len(activities))
|
||||
for _, a := range activities {
|
||||
items = append(items, rawJSON(a.ObjectJSON))
|
||||
}
|
||||
|
||||
coll := map[string]any{
|
||||
"@context": "https://www.w3.org/ns/activitystreams",
|
||||
"id": outboxURL + "?page=" + itoa(page),
|
||||
"type": "OrderedCollectionPage",
|
||||
"partOf": outboxURL,
|
||||
"orderedItems": items,
|
||||
}
|
||||
if int64(offset+activitiesPerPage) < total {
|
||||
coll["next"] = outboxURL + "?page=" + itoa(page+1)
|
||||
}
|
||||
return coll
|
||||
}
|
||||
|
||||
// StubCollection returns a minimal OrderedCollection with zero items.
|
||||
// Used for followers/following until the social graph is implemented.
|
||||
func StubCollection(collectionURL string) map[string]any {
|
||||
return map[string]any{
|
||||
"@context": "https://www.w3.org/ns/activitystreams",
|
||||
"id": collectionURL,
|
||||
"type": "OrderedCollection",
|
||||
"totalItems": 0,
|
||||
"orderedItems": []any{},
|
||||
}
|
||||
}
|
||||
|
||||
func itoa(n int) string {
|
||||
if n == 0 {
|
||||
return "0"
|
||||
}
|
||||
result := ""
|
||||
for n > 0 {
|
||||
result = string(rune('0'+n%10)) + result
|
||||
n /= 10
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// rawJSON wraps a JSON string so it marshals as-is (not double-encoded).
|
||||
type rawJSON string
|
||||
|
||||
func (r rawJSON) MarshalJSON() ([]byte, error) {
|
||||
if r == "" {
|
||||
return []byte("null"), nil
|
||||
}
|
||||
return []byte(r), nil
|
||||
}
|
||||
@@ -0,0 +1,121 @@
|
||||
package federation
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"xorm.io/xorm"
|
||||
|
||||
"github.com/forgeo/forgebucket/internal/models"
|
||||
)
|
||||
|
||||
const activityJSONType = "application/activity+json"
|
||||
|
||||
// FetchActor retrieves a remote actor document by APID. If it is already cached
|
||||
// in the remote_actor table it is returned immediately; otherwise it is fetched
|
||||
// over HTTP and persisted before returning.
|
||||
func FetchActor(db *xorm.Engine, apid string) (*models.RemoteActor, error) {
|
||||
var cached models.RemoteActor
|
||||
if found, _ := db.Where("ap_id = ?", apid).Get(&cached); found {
|
||||
return &cached, nil
|
||||
}
|
||||
|
||||
client := &http.Client{Timeout: 10 * time.Second}
|
||||
req, err := http.NewRequest("GET", apid, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("build request: %w", err)
|
||||
}
|
||||
req.Header.Set("Accept", activityJSONType+", application/ld+json")
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("fetch actor %s: %w", apid, err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("fetch actor %s: HTTP %d", apid, resp.StatusCode)
|
||||
}
|
||||
|
||||
var doc map[string]any
|
||||
if err := json.NewDecoder(resp.Body).Decode(&doc); err != nil {
|
||||
return nil, fmt.Errorf("decode actor: %w", err)
|
||||
}
|
||||
|
||||
inbox, _ := doc["inbox"].(string)
|
||||
pubKey := extractPublicKeyPEM(doc)
|
||||
|
||||
actor := &models.RemoteActor{
|
||||
APID: apid,
|
||||
InboxURL: inbox,
|
||||
PublicKey: pubKey,
|
||||
FetchedAt: time.Now().UTC(),
|
||||
}
|
||||
// Upsert: ignore duplicate key errors (concurrent fetch).
|
||||
db.Insert(actor) //nolint:errcheck
|
||||
|
||||
// Reload to get the DB-assigned ID.
|
||||
db.Where("ap_id = ?", apid).Get(actor) //nolint:errcheck
|
||||
return actor, nil
|
||||
}
|
||||
|
||||
// DeliverActivity POSTs a signed activity to a remote actor's inbox.
|
||||
func DeliverActivity(localActor *models.FederationActor, activity map[string]any, recipientInbox string) error {
|
||||
body, err := json.Marshal(activity)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal activity: %w", err)
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", recipientInbox, jsonReader(body))
|
||||
if err != nil {
|
||||
return fmt.Errorf("build request: %w", err)
|
||||
}
|
||||
req.Header.Set("Content-Type", activityJSONType)
|
||||
req.Header.Set("Accept", activityJSONType)
|
||||
|
||||
if err := Sign(req, localActor.APID+"#main-key", localActor.PrivateKey); err != nil {
|
||||
return fmt.Errorf("sign: %w", err)
|
||||
}
|
||||
|
||||
client := &http.Client{Timeout: 15 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("deliver to %s: %w", recipientInbox, err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode >= 400 {
|
||||
return fmt.Errorf("deliver to %s: HTTP %d", recipientInbox, resp.StatusCode)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func extractPublicKeyPEM(doc map[string]any) string {
|
||||
pk, ok := doc["publicKey"].(map[string]any)
|
||||
if !ok {
|
||||
return ""
|
||||
}
|
||||
pem, _ := pk["publicKeyPem"].(string)
|
||||
return pem
|
||||
}
|
||||
|
||||
// jsonReader wraps a byte slice in a reader that io.ReadCloser can use.
|
||||
func jsonReader(data []byte) *bytesReader {
|
||||
return &bytesReader{data: data, pos: 0}
|
||||
}
|
||||
|
||||
type bytesReader struct {
|
||||
data []byte
|
||||
pos int
|
||||
}
|
||||
|
||||
func (r *bytesReader) Read(p []byte) (int, error) {
|
||||
if r.pos >= len(r.data) {
|
||||
return 0, fmt.Errorf("EOF")
|
||||
}
|
||||
n := copy(p, r.data[r.pos:])
|
||||
r.pos += n
|
||||
return n, nil
|
||||
}
|
||||
@@ -0,0 +1,175 @@
|
||||
package federation
|
||||
|
||||
import (
|
||||
"crypto"
|
||||
"crypto/rand"
|
||||
"crypto/rsa"
|
||||
"crypto/sha256"
|
||||
"crypto/x509"
|
||||
"encoding/base64"
|
||||
"encoding/pem"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"xorm.io/xorm"
|
||||
)
|
||||
|
||||
// Sign adds an HTTP Signature header to req using the RSA private key.
|
||||
// Implements draft-cavage-http-signatures (the fediverse de-facto standard).
|
||||
// Signs: (request-target), host, date. If body is set, also signs digest.
|
||||
func Sign(req *http.Request, keyID, privateKeyPEM string) error {
|
||||
if req.Header.Get("Date") == "" {
|
||||
req.Header.Set("Date", time.Now().UTC().Format(http.TimeFormat))
|
||||
}
|
||||
if req.Header.Get("Host") == "" {
|
||||
req.Header.Set("Host", req.URL.Host)
|
||||
}
|
||||
|
||||
method := strings.ToLower(req.Method)
|
||||
target := req.URL.RequestURI()
|
||||
signingString := fmt.Sprintf("(request-target): %s %s\nhost: %s\ndate: %s",
|
||||
method, target,
|
||||
req.Header.Get("Host"),
|
||||
req.Header.Get("Date"),
|
||||
)
|
||||
headers := "(request-target) host date"
|
||||
|
||||
priv, err := parsePrivateKey(privateKeyPEM)
|
||||
if err != nil {
|
||||
return fmt.Errorf("parse private key: %w", err)
|
||||
}
|
||||
|
||||
h := sha256.Sum256([]byte(signingString))
|
||||
sig, err := rsa.SignPKCS1v15(rand.Reader, priv, crypto.SHA256, h[:])
|
||||
if err != nil {
|
||||
return fmt.Errorf("sign: %w", err)
|
||||
}
|
||||
|
||||
req.Header.Set("Signature", fmt.Sprintf(
|
||||
`keyId="%s",algorithm="rsa-sha256",headers="%s",signature="%s"`,
|
||||
keyID, headers, base64.StdEncoding.EncodeToString(sig),
|
||||
))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Verify validates the HTTP Signature header on an incoming request.
|
||||
// It fetches the sender's public key from their actor document (or the local DB).
|
||||
func Verify(r *http.Request, db *xorm.Engine, instanceURL string) error {
|
||||
sigHeader := r.Header.Get("Signature")
|
||||
if sigHeader == "" {
|
||||
return fmt.Errorf("missing Signature header")
|
||||
}
|
||||
|
||||
params := parseSignatureHeader(sigHeader)
|
||||
keyID := params["keyId"]
|
||||
sigB64 := params["signature"]
|
||||
headersList := params["headers"]
|
||||
if keyID == "" || sigB64 == "" {
|
||||
return fmt.Errorf("malformed Signature header")
|
||||
}
|
||||
|
||||
sig, err := base64.StdEncoding.DecodeString(sigB64)
|
||||
if err != nil {
|
||||
return fmt.Errorf("decode signature: %w", err)
|
||||
}
|
||||
|
||||
// Fetch the public key for this keyId.
|
||||
// keyId is typically "{actorURL}#main-key" — strip the fragment to get the actor APID.
|
||||
actorAPID := strings.SplitN(keyID, "#", 2)[0]
|
||||
pubKeyPEM, err := resolvePublicKey(db, actorAPID, instanceURL)
|
||||
if err != nil {
|
||||
return fmt.Errorf("resolve public key for %s: %w", actorAPID, err)
|
||||
}
|
||||
|
||||
// Reconstruct the signing string from the request.
|
||||
signedHeaders := strings.Fields(headersList)
|
||||
if len(signedHeaders) == 0 {
|
||||
signedHeaders = []string{"date"}
|
||||
}
|
||||
var parts []string
|
||||
for _, h := range signedHeaders {
|
||||
switch h {
|
||||
case "(request-target)":
|
||||
parts = append(parts, fmt.Sprintf("(request-target): %s %s",
|
||||
strings.ToLower(r.Method), r.URL.RequestURI()))
|
||||
default:
|
||||
parts = append(parts, h+": "+r.Header.Get(http.CanonicalHeaderKey(h)))
|
||||
}
|
||||
}
|
||||
signingString := strings.Join(parts, "\n")
|
||||
|
||||
pub, err := parsePublicKey(pubKeyPEM)
|
||||
if err != nil {
|
||||
return fmt.Errorf("parse public key: %w", err)
|
||||
}
|
||||
|
||||
h := sha256.Sum256([]byte(signingString))
|
||||
if err := rsa.VerifyPKCS1v15(pub, crypto.SHA256, h[:], sig); err != nil {
|
||||
return fmt.Errorf("signature verification failed: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ── helpers ──────────────────────────────────────────────────────────────────
|
||||
|
||||
func parseSignatureHeader(header string) map[string]string {
|
||||
params := make(map[string]string)
|
||||
for _, part := range strings.Split(header, ",") {
|
||||
part = strings.TrimSpace(part)
|
||||
idx := strings.Index(part, "=")
|
||||
if idx < 0 {
|
||||
continue
|
||||
}
|
||||
key := strings.TrimSpace(part[:idx])
|
||||
val := strings.Trim(strings.TrimSpace(part[idx+1:]), `"`)
|
||||
params[key] = val
|
||||
}
|
||||
return params
|
||||
}
|
||||
|
||||
func parsePrivateKey(pemStr string) (*rsa.PrivateKey, error) {
|
||||
block, _ := pem.Decode([]byte(pemStr))
|
||||
if block == nil {
|
||||
return nil, fmt.Errorf("no PEM block found")
|
||||
}
|
||||
return x509.ParsePKCS1PrivateKey(block.Bytes)
|
||||
}
|
||||
|
||||
func parsePublicKey(pemStr string) (*rsa.PublicKey, error) {
|
||||
block, _ := pem.Decode([]byte(pemStr))
|
||||
if block == nil {
|
||||
return nil, fmt.Errorf("no PEM block found")
|
||||
}
|
||||
pub, err := x509.ParsePKIXPublicKey(block.Bytes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rsaPub, ok := pub.(*rsa.PublicKey)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("not an RSA public key")
|
||||
}
|
||||
return rsaPub, nil
|
||||
}
|
||||
|
||||
// resolvePublicKey returns the public key PEM for an actor APID.
|
||||
// Checks local actors first, then remote cache, then fetches from network.
|
||||
func resolvePublicKey(db *xorm.Engine, actorAPID, instanceURL string) (string, error) {
|
||||
// Check if it's a local actor.
|
||||
var local struct {
|
||||
PublicKey string `xorm:"public_key"`
|
||||
}
|
||||
if found, _ := db.Table("federation_actor").
|
||||
Where("ap_id = ?", actorAPID).
|
||||
Cols("public_key").Get(&local); found && local.PublicKey != "" {
|
||||
return local.PublicKey, nil
|
||||
}
|
||||
|
||||
// Fetch (and cache) from network.
|
||||
remote, err := FetchActor(db, actorAPID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return remote.PublicKey, nil
|
||||
}
|
||||
Reference in New Issue
Block a user