123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425 |
- package federationrepository
- import (
- "context"
- "database/sql"
- "fmt"
- "net/url"
- "time"
- "github.com/go-fed/activity/streams"
- "github.com/go-fed/activity/streams/vocab"
- "github.com/owncast/owncast/models"
- "github.com/owncast/owncast/services/apfederation/apmodels"
- "github.com/owncast/owncast/services/apfederation/resolvers"
- "github.com/owncast/owncast/storage/data"
- "github.com/owncast/owncast/storage/sqlstorage"
- "github.com/owncast/owncast/utils"
- "github.com/pkg/errors"
- log "github.com/sirupsen/logrus"
- )
- type FederationRepository struct {
- datastore *data.Store
- }
- func New(datastore *data.Store) *FederationRepository {
- r := &FederationRepository{
- datastore: datastore,
- }
- return r
- }
- // NOTE: This is temporary during the transition period.
- var temporaryGlobalInstance *FederationRepository
- // GetUserRepository will return the user repository.
- func Get() *FederationRepository {
- if temporaryGlobalInstance == nil {
- i := New(data.GetDatastore())
- temporaryGlobalInstance = i
- }
- return temporaryGlobalInstance
- }
- // GetFollowerCount will return the number of followers we're keeping track of.
- func (f *FederationRepository) GetFollowerCount() (int64, error) {
- ctx := context.Background()
- return f.datastore.GetQueries().GetFollowerCount(ctx)
- }
- // GetFederationFollowers will return a slice of the followers we keep track of locally.
- func (f *FederationRepository) GetFederationFollowers(limit int, offset int) ([]models.Follower, int, error) {
- ctx := context.Background()
- total, err := f.datastore.GetQueries().GetFollowerCount(ctx)
- if err != nil {
- return nil, 0, errors.Wrap(err, "unable to fetch total number of followers")
- }
- followersResult, err := f.datastore.GetQueries().GetFederationFollowersWithOffset(ctx, sqlstorage.GetFederationFollowersWithOffsetParams{
- Limit: int32(limit),
- Offset: int32(offset),
- })
- if err != nil {
- return nil, 0, err
- }
- followers := make([]models.Follower, 0)
- for _, row := range followersResult {
- singleFollower := models.Follower{
- Name: row.Name.String,
- Username: row.Username,
- Image: row.Image.String,
- ActorIRI: row.Iri,
- Inbox: row.Inbox,
- Timestamp: utils.NullTime(row.CreatedAt),
- }
- followers = append(followers, singleFollower)
- }
- return followers, int(total), nil
- }
- // GetPendingFollowRequests will return pending follow requests.
- func (f *FederationRepository) GetPendingFollowRequests() ([]models.Follower, error) {
- pendingFollowersResult, err := f.datastore.GetQueries().GetFederationFollowerApprovalRequests(context.Background())
- if err != nil {
- return nil, err
- }
- followers := make([]models.Follower, 0)
- for _, row := range pendingFollowersResult {
- singleFollower := models.Follower{
- Name: row.Name.String,
- Username: row.Username,
- Image: row.Image.String,
- ActorIRI: row.Iri,
- Inbox: row.Inbox,
- Timestamp: utils.NullTime{Time: row.CreatedAt.Time, Valid: true},
- }
- followers = append(followers, singleFollower)
- }
- return followers, nil
- }
- // GetBlockedAndRejectedFollowers will return blocked and rejected followers.
- func (f *FederationRepository) GetBlockedAndRejectedFollowers() ([]models.Follower, error) {
- pendingFollowersResult, err := f.datastore.GetQueries().GetRejectedAndBlockedFollowers(context.Background())
- if err != nil {
- return nil, err
- }
- followers := make([]models.Follower, 0)
- for _, row := range pendingFollowersResult {
- singleFollower := models.Follower{
- Name: row.Name.String,
- Username: row.Username,
- Image: row.Image.String,
- ActorIRI: row.Iri,
- DisabledAt: utils.NullTime{Time: row.DisabledAt.Time, Valid: true},
- Timestamp: utils.NullTime{Time: row.CreatedAt.Time, Valid: true},
- }
- followers = append(followers, singleFollower)
- }
- return followers, nil
- }
- // AddFollow will save a follow to the datastore.
- func (f *FederationRepository) AddFollow(follow apmodels.ActivityPubActor, approved bool) error {
- log.Traceln("Saving", follow.ActorIri, "as a follower.")
- var image string
- if follow.Image != nil {
- image = follow.Image.String()
- }
- followRequestObject, err := apmodels.Serialize(follow.RequestObject)
- if err != nil {
- return errors.Wrap(err, "error serializing follow request object")
- }
- return f.createFollow(follow.ActorIri.String(), follow.Inbox.String(), follow.FollowRequestIri.String(), follow.Name, follow.Username, image, followRequestObject, approved)
- }
- // RemoveFollow will remove a follow from the datastore.
- func (f *FederationRepository) RemoveFollow(unfollow apmodels.ActivityPubActor) error {
- log.Traceln("Removing", unfollow.ActorIri, "as a follower.")
- return f.removeFollow(unfollow.ActorIri)
- }
- // GetFollower will return a single follower/request given an IRI.
- func (f *FederationRepository) GetFollower(iri string) (*apmodels.ActivityPubActor, error) {
- result, err := f.datastore.GetQueries().GetFollowerByIRI(context.Background(), iri)
- if err != nil {
- return nil, err
- }
- followIRI, err := url.Parse(result.Request)
- if err != nil {
- return nil, errors.Wrap(err, "error parsing follow request IRI")
- }
- iriURL, err := url.Parse(result.Iri)
- if err != nil {
- return nil, errors.Wrap(err, "error parsing actor IRI")
- }
- inbox, err := url.Parse(result.Inbox)
- if err != nil {
- return nil, errors.Wrap(err, "error parsing acting inbox")
- }
- image, _ := url.Parse(result.Image.String)
- var disabledAt *time.Time
- if result.DisabledAt.Valid {
- disabledAt = &result.DisabledAt.Time
- }
- follower := apmodels.ActivityPubActor{
- ActorIri: iriURL,
- Inbox: inbox,
- Name: result.Name.String,
- Username: result.Username,
- Image: image,
- FollowRequestIri: followIRI,
- DisabledAt: disabledAt,
- }
- return &follower, nil
- }
- // ApprovePreviousFollowRequest will approve a follow request.
- func (f *FederationRepository) ApprovePreviousFollowRequest(iri string) error {
- return f.datastore.GetQueries().ApproveFederationFollower(context.Background(), sqlstorage.ApproveFederationFollowerParams{
- Iri: iri,
- ApprovedAt: sql.NullTime{
- Time: time.Now(),
- Valid: true,
- },
- })
- }
- // BlockOrRejectFollower will block an existing follower or reject a follow request.
- func (f *FederationRepository) BlockOrRejectFollower(iri string) error {
- return f.datastore.GetQueries().RejectFederationFollower(context.Background(), sqlstorage.RejectFederationFollowerParams{
- Iri: iri,
- DisabledAt: sql.NullTime{
- Time: time.Now(),
- Valid: true,
- },
- })
- }
- func (f *FederationRepository) createFollow(actor, inbox, request, name, username, image string, requestObject []byte, approved bool) error {
- tx, err := f.datastore.DB.Begin()
- if err != nil {
- log.Debugln(err)
- }
- defer func() {
- _ = tx.Rollback()
- }()
- var approvedAt sql.NullTime
- if approved {
- approvedAt = sql.NullTime{
- Time: time.Now(),
- Valid: true,
- }
- }
- if err = f.datastore.GetQueries().WithTx(tx).AddFollower(context.Background(), sqlstorage.AddFollowerParams{
- Iri: actor,
- Inbox: inbox,
- Name: sql.NullString{String: name, Valid: true},
- Username: username,
- Image: sql.NullString{String: image, Valid: true},
- ApprovedAt: approvedAt,
- Request: request,
- RequestObject: requestObject,
- }); err != nil {
- log.Errorln("error creating new federation follow: ", err)
- }
- return tx.Commit()
- }
- // UpdateFollower will update the details of a stored follower given an IRI.
- func (f *FederationRepository) UpdateFollower(actorIRI string, inbox string, name string, username string, image string) error {
- f.datastore.DbLock.Lock()
- defer f.datastore.DbLock.Unlock()
- tx, err := f.datastore.DB.Begin()
- if err != nil {
- log.Debugln(err)
- }
- defer func() {
- _ = tx.Rollback()
- }()
- if err = f.datastore.GetQueries().WithTx(tx).UpdateFollowerByIRI(context.Background(), sqlstorage.UpdateFollowerByIRIParams{
- Inbox: inbox,
- Name: sql.NullString{String: name, Valid: true},
- Username: username,
- Image: sql.NullString{String: image, Valid: true},
- Iri: actorIRI,
- }); err != nil {
- return fmt.Errorf("error updating follower %s %s", actorIRI, err)
- }
- return tx.Commit()
- }
- func (f *FederationRepository) removeFollow(actor *url.URL) error {
- f.datastore.DbLock.Lock()
- defer f.datastore.DbLock.Unlock()
- tx, err := f.datastore.DB.Begin()
- if err != nil {
- return err
- }
- defer func() {
- _ = tx.Rollback()
- }()
- if err := f.datastore.GetQueries().WithTx(tx).RemoveFollowerByIRI(context.Background(), actor.String()); err != nil {
- return err
- }
- return tx.Commit()
- }
- // GetOutboxPostCount will return the number of posts in the outbox.
- func (f *FederationRepository) GetOutboxPostCount() (int64, error) {
- ctx := context.Background()
- return f.datastore.GetQueries().GetLocalPostCount(ctx)
- }
- // GetOutbox will return an instance of the outbox populated by stored items.
- func (f *FederationRepository) GetOutbox(limit int, offset int) (vocab.ActivityStreamsOrderedCollection, error) {
- collection := streams.NewActivityStreamsOrderedCollection()
- orderedItems := streams.NewActivityStreamsOrderedItemsProperty()
- r := resolvers.Get()
- rows, err := f.datastore.GetQueries().GetOutboxWithOffset(
- context.Background(),
- sqlstorage.GetOutboxWithOffsetParams{Limit: int32(limit), Offset: int32(offset)},
- )
- if err != nil {
- return collection, err
- }
- for _, value := range rows {
- createCallback := func(c context.Context, activity vocab.ActivityStreamsCreate) error {
- orderedItems.AppendActivityStreamsCreate(activity)
- return nil
- }
- if err := r.Resolve(context.Background(), value, createCallback); err != nil {
- return collection, err
- }
- }
- return collection, nil
- }
- // AddToOutbox will store a single payload to the persistence layer.
- func (f *FederationRepository) AddToOutbox(iri string, itemData []byte, typeString string, isLiveNotification bool) error {
- tx, err := f.datastore.DB.Begin()
- if err != nil {
- log.Debugln(err)
- }
- defer func() {
- _ = tx.Rollback()
- }()
- if err = f.datastore.GetQueries().WithTx(tx).AddToOutbox(context.Background(), sqlstorage.AddToOutboxParams{
- Iri: iri,
- Value: itemData,
- Type: typeString,
- LiveNotification: sql.NullBool{Bool: isLiveNotification, Valid: true},
- }); err != nil {
- return fmt.Errorf("error creating new item in federation outbox %s", err)
- }
- return tx.Commit()
- }
- // GetObjectByIRI will return a string representation of a single object by the IRI.
- func (f *FederationRepository) GetObjectByIRI(iri string) (string, bool, time.Time, error) {
- row, err := f.datastore.GetQueries().GetObjectFromOutboxByIRI(context.Background(), iri)
- return string(row.Value), row.LiveNotification.Bool, row.CreatedAt.Time, err
- }
- // GetLocalPostCount will return the number of posts existing locally.
- func (f *FederationRepository) GetLocalPostCount() (int64, error) {
- ctx := context.Background()
- return f.datastore.GetQueries().GetLocalPostCount(ctx)
- }
- // SaveInboundFediverseActivity will save an event to the ap_inbound_activities table.
- func (f *FederationRepository) SaveInboundFediverseActivity(objectIRI string, actorIRI string, eventType string, timestamp time.Time) error {
- if err := f.datastore.GetQueries().AddToAcceptedActivities(context.Background(), sqlstorage.AddToAcceptedActivitiesParams{
- Iri: objectIRI,
- Actor: actorIRI,
- Type: eventType,
- Timestamp: timestamp,
- }); err != nil {
- return errors.Wrap(err, "error saving event "+objectIRI)
- }
- return nil
- }
- // GetInboundActivities will return a collection of saved, federated activities
- // limited and offset by the values provided to support pagination.
- func (f *FederationRepository) GetInboundActivities(limit int, offset int) ([]models.FederatedActivity, int, error) {
- ctx := context.Background()
- rows, err := f.datastore.GetQueries().GetInboundActivitiesWithOffset(ctx, sqlstorage.GetInboundActivitiesWithOffsetParams{
- Limit: int32(limit),
- Offset: int32(offset),
- })
- if err != nil {
- return nil, 0, err
- }
- activities := make([]models.FederatedActivity, 0)
- total, err := f.datastore.GetQueries().GetInboundActivityCount(context.Background())
- if err != nil {
- return nil, 0, errors.Wrap(err, "unable to fetch total activity count")
- }
- for _, row := range rows {
- singleActivity := models.FederatedActivity{
- IRI: row.Iri,
- ActorIRI: row.Actor,
- Type: row.Type,
- Timestamp: row.Timestamp,
- }
- activities = append(activities, singleActivity)
- }
- return activities, int(total), nil
- }
- // HasPreviouslyHandledInboundActivity will return if we have previously handled
- // an inbound federated activity.
- func (f *FederationRepository) HasPreviouslyHandledInboundActivity(iri string, actorIRI string, eventType string) (bool, error) {
- exists, err := f.datastore.GetQueries().DoesInboundActivityExist(context.Background(), sqlstorage.DoesInboundActivityExistParams{
- Iri: iri,
- Actor: actorIRI,
- Type: eventType,
- })
- if err != nil {
- return false, err
- }
- return exists > 0, nil
- }
|