persistence.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. package persistence
  2. import (
  3. "context"
  4. "database/sql"
  5. "fmt"
  6. "net/url"
  7. "time"
  8. "github.com/go-fed/activity/streams"
  9. "github.com/go-fed/activity/streams/vocab"
  10. "github.com/owncast/owncast/activitypub/apmodels"
  11. "github.com/owncast/owncast/activitypub/resolvers"
  12. "github.com/owncast/owncast/core/data"
  13. "github.com/owncast/owncast/db"
  14. "github.com/owncast/owncast/models"
  15. "github.com/pkg/errors"
  16. log "github.com/sirupsen/logrus"
  17. )
  18. var _datastore *data.Datastore
  19. // Setup will initialize the ActivityPub persistence layer with the provided datastore.
  20. func Setup(datastore *data.Datastore) {
  21. _datastore = datastore
  22. createFederationFollowersTable()
  23. createFederationOutboxTable()
  24. createFederatedActivitiesTable()
  25. }
  26. // AddFollow will save a follow to the datastore.
  27. func AddFollow(follow apmodels.ActivityPubActor, approved bool) error {
  28. log.Traceln("Saving", follow.ActorIri, "as a follower.")
  29. var image string
  30. if follow.Image != nil {
  31. image = follow.Image.String()
  32. }
  33. followRequestObject, err := apmodels.Serialize(follow.RequestObject)
  34. if err != nil {
  35. return errors.Wrap(err, "error serializing follow request object")
  36. }
  37. return createFollow(follow.ActorIri.String(), follow.Inbox.String(), follow.FollowRequestIri.String(), follow.Name, follow.Username, image, followRequestObject, approved)
  38. }
  39. // RemoveFollow will remove a follow from the datastore.
  40. func RemoveFollow(unfollow apmodels.ActivityPubActor) error {
  41. log.Traceln("Removing", unfollow.ActorIri, "as a follower.")
  42. return removeFollow(unfollow.ActorIri)
  43. }
  44. // GetFollower will return a single follower/request given an IRI.
  45. func GetFollower(iri string) (*apmodels.ActivityPubActor, error) {
  46. result, err := _datastore.GetQueries().GetFollowerByIRI(context.Background(), iri)
  47. if err != nil {
  48. return nil, err
  49. }
  50. followIRI, err := url.Parse(result.Request)
  51. if err != nil {
  52. return nil, errors.Wrap(err, "error parsing follow request IRI")
  53. }
  54. iriURL, err := url.Parse(result.Iri)
  55. if err != nil {
  56. return nil, errors.Wrap(err, "error parsing actor IRI")
  57. }
  58. inbox, err := url.Parse(result.Inbox)
  59. if err != nil {
  60. return nil, errors.Wrap(err, "error parsing acting inbox")
  61. }
  62. image, _ := url.Parse(result.Image.String)
  63. var disabledAt *time.Time
  64. if result.DisabledAt.Valid {
  65. disabledAt = &result.DisabledAt.Time
  66. }
  67. follower := apmodels.ActivityPubActor{
  68. ActorIri: iriURL,
  69. Inbox: inbox,
  70. Name: result.Name.String,
  71. Username: result.Username,
  72. Image: image,
  73. FollowRequestIri: followIRI,
  74. DisabledAt: disabledAt,
  75. }
  76. return &follower, nil
  77. }
  78. // ApprovePreviousFollowRequest will approve a follow request.
  79. func ApprovePreviousFollowRequest(iri string) error {
  80. return _datastore.GetQueries().ApproveFederationFollower(context.Background(), db.ApproveFederationFollowerParams{
  81. Iri: iri,
  82. ApprovedAt: sql.NullTime{
  83. Time: time.Now(),
  84. Valid: true,
  85. },
  86. })
  87. }
  88. // BlockOrRejectFollower will block an existing follower or reject a follow request.
  89. func BlockOrRejectFollower(iri string) error {
  90. return _datastore.GetQueries().RejectFederationFollower(context.Background(), db.RejectFederationFollowerParams{
  91. Iri: iri,
  92. DisabledAt: sql.NullTime{
  93. Time: time.Now(),
  94. Valid: true,
  95. },
  96. })
  97. }
  98. func createFollow(actor, inbox, request, name, username, image string, requestObject []byte, approved bool) error {
  99. tx, err := _datastore.DB.Begin()
  100. if err != nil {
  101. log.Debugln(err)
  102. }
  103. defer func() {
  104. _ = tx.Rollback()
  105. }()
  106. var approvedAt sql.NullTime
  107. if approved {
  108. approvedAt = sql.NullTime{
  109. Time: time.Now(),
  110. Valid: true,
  111. }
  112. }
  113. if err = _datastore.GetQueries().WithTx(tx).AddFollower(context.Background(), db.AddFollowerParams{
  114. Iri: actor,
  115. Inbox: inbox,
  116. Name: sql.NullString{String: name, Valid: true},
  117. Username: username,
  118. Image: sql.NullString{String: image, Valid: true},
  119. ApprovedAt: approvedAt,
  120. Request: request,
  121. RequestObject: requestObject,
  122. }); err != nil {
  123. log.Errorln("error creating new federation follow: ", err)
  124. }
  125. return tx.Commit()
  126. }
  127. // UpdateFollower will update the details of a stored follower given an IRI.
  128. func UpdateFollower(actorIRI string, inbox string, name string, username string, image string) error {
  129. _datastore.DbLock.Lock()
  130. defer _datastore.DbLock.Unlock()
  131. tx, err := _datastore.DB.Begin()
  132. if err != nil {
  133. log.Debugln(err)
  134. }
  135. defer func() {
  136. _ = tx.Rollback()
  137. }()
  138. if err = _datastore.GetQueries().WithTx(tx).UpdateFollowerByIRI(context.Background(), db.UpdateFollowerByIRIParams{
  139. Inbox: inbox,
  140. Name: sql.NullString{String: name, Valid: true},
  141. Username: username,
  142. Image: sql.NullString{String: image, Valid: true},
  143. Iri: actorIRI,
  144. }); err != nil {
  145. return fmt.Errorf("error updating follower %s %s", actorIRI, err)
  146. }
  147. return tx.Commit()
  148. }
  149. func removeFollow(actor *url.URL) error {
  150. _datastore.DbLock.Lock()
  151. defer _datastore.DbLock.Unlock()
  152. tx, err := _datastore.DB.Begin()
  153. if err != nil {
  154. return err
  155. }
  156. defer func() {
  157. _ = tx.Rollback()
  158. }()
  159. if err := _datastore.GetQueries().WithTx(tx).RemoveFollowerByIRI(context.Background(), actor.String()); err != nil {
  160. return err
  161. }
  162. return tx.Commit()
  163. }
  164. // createFederatedActivitiesTable will create the accepted
  165. // activities table if needed.
  166. func createFederatedActivitiesTable() {
  167. createTableSQL := `CREATE TABLE IF NOT EXISTS ap_accepted_activities (
  168. "id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
  169. "iri" TEXT NOT NULL,
  170. "actor" TEXT NOT NULL,
  171. "type" TEXT NOT NULL,
  172. "timestamp" TIMESTAMP NOT NULL
  173. );`
  174. _datastore.MustExec(createTableSQL)
  175. _datastore.MustExec(`CREATE INDEX IF NOT EXISTS idx_iri_actor_index ON ap_accepted_activities (iri,actor);`)
  176. }
  177. func createFederationOutboxTable() {
  178. log.Traceln("Creating federation outbox table...")
  179. createTableSQL := `CREATE TABLE IF NOT EXISTS ap_outbox (
  180. "iri" TEXT NOT NULL,
  181. "value" BLOB,
  182. "type" TEXT NOT NULL,
  183. "created_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  184. "live_notification" BOOLEAN DEFAULT FALSE,
  185. PRIMARY KEY (iri));`
  186. _datastore.MustExec(createTableSQL)
  187. _datastore.MustExec(`CREATE INDEX IF NOT EXISTS idx_iri ON ap_outbox (iri);`)
  188. _datastore.MustExec(`CREATE INDEX IF NOT EXISTS idx_type ON ap_outbox (type);`)
  189. _datastore.MustExec(`CREATE INDEX IF NOT EXISTS idx_live_notification ON ap_outbox (live_notification);`)
  190. }
  191. // GetOutboxPostCount will return the number of posts in the outbox.
  192. func GetOutboxPostCount() (int64, error) {
  193. ctx := context.Background()
  194. return _datastore.GetQueries().GetLocalPostCount(ctx)
  195. }
  196. // GetOutbox will return an instance of the outbox populated by stored items.
  197. func GetOutbox(limit int, offset int) (vocab.ActivityStreamsOrderedCollection, error) {
  198. collection := streams.NewActivityStreamsOrderedCollection()
  199. orderedItems := streams.NewActivityStreamsOrderedItemsProperty()
  200. rows, err := _datastore.GetQueries().GetOutboxWithOffset(
  201. context.Background(),
  202. db.GetOutboxWithOffsetParams{Limit: int32(limit), Offset: int32(offset)},
  203. )
  204. if err != nil {
  205. return collection, err
  206. }
  207. for _, value := range rows {
  208. createCallback := func(c context.Context, activity vocab.ActivityStreamsCreate) error {
  209. orderedItems.AppendActivityStreamsCreate(activity)
  210. return nil
  211. }
  212. if err := resolvers.Resolve(context.Background(), value, createCallback); err != nil {
  213. return collection, err
  214. }
  215. }
  216. return collection, nil
  217. }
  218. // AddToOutbox will store a single payload to the persistence layer.
  219. func AddToOutbox(iri string, itemData []byte, typeString string, isLiveNotification bool) error {
  220. tx, err := _datastore.DB.Begin()
  221. if err != nil {
  222. log.Debugln(err)
  223. }
  224. defer func() {
  225. _ = tx.Rollback()
  226. }()
  227. if err = _datastore.GetQueries().WithTx(tx).AddToOutbox(context.Background(), db.AddToOutboxParams{
  228. Iri: iri,
  229. Value: itemData,
  230. Type: typeString,
  231. LiveNotification: sql.NullBool{Bool: isLiveNotification, Valid: true},
  232. }); err != nil {
  233. return fmt.Errorf("error creating new item in federation outbox %s", err)
  234. }
  235. return tx.Commit()
  236. }
  237. // GetObjectByIRI will return a string representation of a single object by the IRI.
  238. func GetObjectByIRI(iri string) (string, bool, time.Time, error) {
  239. row, err := _datastore.GetQueries().GetObjectFromOutboxByIRI(context.Background(), iri)
  240. return string(row.Value), row.LiveNotification.Bool, row.CreatedAt.Time, err
  241. }
  242. // GetLocalPostCount will return the number of posts existing locally.
  243. func GetLocalPostCount() (int64, error) {
  244. ctx := context.Background()
  245. return _datastore.GetQueries().GetLocalPostCount(ctx)
  246. }
  247. // SaveInboundFediverseActivity will save an event to the ap_inbound_activities table.
  248. func SaveInboundFediverseActivity(objectIRI string, actorIRI string, eventType string, timestamp time.Time) error {
  249. if err := _datastore.GetQueries().AddToAcceptedActivities(context.Background(), db.AddToAcceptedActivitiesParams{
  250. Iri: objectIRI,
  251. Actor: actorIRI,
  252. Type: eventType,
  253. Timestamp: timestamp,
  254. }); err != nil {
  255. return errors.Wrap(err, "error saving event "+objectIRI)
  256. }
  257. return nil
  258. }
  259. // GetInboundActivities will return a collection of saved, federated activities
  260. // limited and offset by the values provided to support pagination.
  261. func GetInboundActivities(limit int, offset int) ([]models.FederatedActivity, int, error) {
  262. ctx := context.Background()
  263. rows, err := _datastore.GetQueries().GetInboundActivitiesWithOffset(ctx, db.GetInboundActivitiesWithOffsetParams{
  264. Limit: int32(limit),
  265. Offset: int32(offset),
  266. })
  267. if err != nil {
  268. return nil, 0, err
  269. }
  270. activities := make([]models.FederatedActivity, 0)
  271. total, err := _datastore.GetQueries().GetInboundActivityCount(context.Background())
  272. if err != nil {
  273. return nil, 0, errors.Wrap(err, "unable to fetch total activity count")
  274. }
  275. for _, row := range rows {
  276. singleActivity := models.FederatedActivity{
  277. IRI: row.Iri,
  278. ActorIRI: row.Actor,
  279. Type: row.Type,
  280. Timestamp: row.Timestamp,
  281. }
  282. activities = append(activities, singleActivity)
  283. }
  284. return activities, int(total), nil
  285. }
  286. // HasPreviouslyHandledInboundActivity will return if we have previously handled
  287. // an inbound federated activity.
  288. func HasPreviouslyHandledInboundActivity(iri string, actorIRI string, eventType string) (bool, error) {
  289. exists, err := _datastore.GetQueries().DoesInboundActivityExist(context.Background(), db.DoesInboundActivityExistParams{
  290. Iri: iri,
  291. Actor: actorIRI,
  292. Type: eventType,
  293. })
  294. if err != nil {
  295. return false, err
  296. }
  297. return exists > 0, nil
  298. }