Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MM-48342] Stateful timers #375

Merged
merged 16 commits into from
Feb 21, 2023
13 changes: 13 additions & 0 deletions apps/appclient/mattermost_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,19 @@ func (c *Client) Unsubscribe(sub *apps.Subscription) error {
return nil
}

func (c *Client) CreateTimer(t *apps.Timer) error {
res, err := c.ClientPP.CreateTimer(t)
if err != nil {
return err
}

if res.StatusCode != http.StatusCreated && res.StatusCode != http.StatusOK {
return errors.Errorf("returned with status %d", res.StatusCode)
}

return nil
}

func (c *Client) StoreOAuth2App(oauth2App apps.OAuth2App) error {
res, err := c.ClientPP.StoreOAuth2App(oauth2App)
if err != nil {
Expand Down
14 changes: 14 additions & 0 deletions apps/appclient/mattermost_client_pp.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,20 @@ func (c *ClientPP) Unsubscribe(sub *apps.Subscription) (*model.Response, error)
return model.BuildResponse(r), nil
}

func (c *ClientPP) CreateTimer(t *apps.Timer) (*model.Response, error) {
data, err := json.Marshal(t)
if err != nil {
return nil, err
}
r, err := c.DoAPIPOST(c.apipath(appspath.TimerCreate), string(data)) // nolint:bodyclose
if err != nil {
return model.BuildResponse(r), err
}
defer c.closeBody(r)

return model.BuildResponse(r), nil
}

func (c *ClientPP) StoreOAuth2App(oauth2App apps.OAuth2App) (*model.Response, error) {
r, err := c.DoAPIPOST(c.apipath(appspath.OAuth2App), utils.ToJSON(oauth2App)) // nolint:bodyclose
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions apps/path/paths.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const (
OAuth2User = "/oauth2/user"
Subscribe = "/subscribe"
Unsubscribe = "/unsubscribe"
TimerCreate = "/timer"

// Invoke.
Call = "/call"
Expand Down
46 changes: 46 additions & 0 deletions apps/timer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) 2020-present Mattermost, Inc. All Rights Reserved.
// See License for license information.

package apps

import (
"time"

"github.com/hashicorp/go-multierror"

"github.com/mattermost/mattermost-plugin-apps/utils"
)

// Timer s submitted by an app to the Timer API. It determines when
// the app would like to be notified, and how these notifications
// should be invoked.
type Timer struct {
// At is the unix time in milliseconds when the timer should be executed.
At int64 `json:"at"`

// Call is the (one-way) call to make upon the timers execution.
Call Call `json:"call"`

// ChannelID is a channel ID that is used for expansion of the Call (optional).
ChannelID string `json:"channel_id,omitempty"`
// TeamID is a team ID that is used for expansion of the Call (optional).
TeamID string `json:"team_id,omitempty"`
}

func (t Timer) Validate() error {
var result error
emptyCall := Call{}
if t.Call == emptyCall {
result = multierror.Append(result, utils.NewInvalidError("call must not be empty"))
}

if t.At <= 0 {
result = multierror.Append(result, utils.NewInvalidError("at must be positive"))
}

if time.Until(time.UnixMilli(t.At)) < 1*time.Second {
result = multierror.Append(result, utils.NewInvalidError("at most be at least 1 second in the future"))
}

return result
}
1 change: 1 addition & 0 deletions build/custom.mk
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ ifneq ($(HAS_SERVER),)
mockgen -destination server/mocks/mock_upstream/mock_upstream.go github.com/mattermost/mattermost-plugin-apps/upstream Upstream
mockgen -destination server/mocks/mock_store/mock_appstore.go github.com/mattermost/mattermost-plugin-apps/server/store AppStore
mockgen -destination server/mocks/mock_store/mock_session.go github.com/mattermost/mattermost-plugin-apps/server/store SessionStore
mockgen -destination server/mocks/mock_store/mock_app.go github.com/mattermost/mattermost-plugin-apps/server/store AppStore
endif

## Generates mock golang interfaces for testing
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/gorilla/mux v1.8.0
github.com/hashicorp/go-getter v1.6.2
github.com/hashicorp/go-multierror v1.1.1
github.com/mattermost/mattermost-plugin-api v0.1.1
github.com/mattermost/mattermost-plugin-api v0.1.2-0.20221110071900-f8b73bc6795e
// mmgoget: github.com/mattermost/mattermost-server/[email protected] is replaced by -> github.com/mattermost/mattermost-server/v6@ea08d47f60
github.com/mattermost/mattermost-server/v6 v6.0.0-20230113170349-ea08d47f6051
github.com/nicksnyder/go-i18n/v2 v2.2.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1095,8 +1095,8 @@ github.com/mattermost/ldap v0.0.0-20201202150706-ee0e6284187d h1:/RJ/UV7M5c7L2TQ
github.com/mattermost/ldap v0.0.0-20201202150706-ee0e6284187d/go.mod h1:HLbgMEI5K131jpxGazJ97AxfPDt31osq36YS1oxFQPQ=
github.com/mattermost/logr/v2 v2.0.15 h1:+WNbGcsc3dBao65eXlceB6dTILNJRIrvubnsTl3zBew=
github.com/mattermost/logr/v2 v2.0.15/go.mod h1:mpPp935r5dIkFDo2y9Q87cQWhFR/4xXpNh0k/y8Hmwg=
github.com/mattermost/mattermost-plugin-api v0.1.1 h1:bNnPbWCLWZpT/k2kjUxNnzCfUggU8WKs2ddz7hNjg1U=
github.com/mattermost/mattermost-plugin-api v0.1.1/go.mod h1:9yZhtg0bBj3kqSTjXnjYBMZoTsWbe3ajdFMdl9/Jz34=
github.com/mattermost/mattermost-plugin-api v0.1.2-0.20221110071900-f8b73bc6795e h1:7iT66sN3DzSg4ZrVpSf4igNHkcoEZhfj0/q2JoQauTQ=
github.com/mattermost/mattermost-plugin-api v0.1.2-0.20221110071900-f8b73bc6795e/go.mod h1:9yZhtg0bBj3kqSTjXnjYBMZoTsWbe3ajdFMdl9/Jz34=
github.com/mattermost/mattermost-server/v6 v6.0.0-20230113170349-ea08d47f6051 h1:bL3nQUUQmQotteHuA6ltKga3S3PbR03ElM5qJRXSeyY=
github.com/mattermost/mattermost-server/v6 v6.0.0-20230113170349-ea08d47f6051/go.mod h1:U3gSM0I15WSMHPpDEU30mmc4JrbSDk+8F1+MFLOHWD0=
github.com/mattermost/morph v1.0.5-0.20221115094356-4c18a75b1f5e h1:VfNz+fvJ3DxOlALM22Eea8ONp5jHrybKBCcCtDPVlss=
Expand Down
49 changes: 45 additions & 4 deletions server/appservices/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,15 @@
package appservices

import (
"github.com/pkg/errors"

"github.com/mattermost/mattermost-plugin-api/cluster"

"github.com/mattermost/mattermost-plugin-apps/apps"
"github.com/mattermost/mattermost-plugin-apps/server/config"
"github.com/mattermost/mattermost-plugin-apps/server/incoming"
"github.com/mattermost/mattermost-plugin-apps/server/store"
"github.com/mattermost/mattermost-plugin-apps/utils"
)

type Service interface {
Expand All @@ -17,6 +23,10 @@ type Service interface {
Unsubscribe(*incoming.Request, apps.Event) error
UnsubscribeApp(*incoming.Request, apps.AppID) error

// Timer

CreateTimer(*incoming.Request, apps.Timer) error

// KV

KVSet(_ *incoming.Request, prefix, id string, data []byte) (bool, error)
Expand All @@ -33,14 +43,45 @@ type Service interface {
GetOAuth2User(_ *incoming.Request) ([]byte, error)
}

type Caller interface {
InvokeCall(*incoming.Request, apps.CallRequest) (*apps.App, apps.CallResponse)
NewIncomingRequest() *incoming.Request
}

type AppServices struct {
store *store.Service
store *store.Service
scheduler *cluster.JobOnceScheduler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's my biggest issue with this PR. JobOnceScheduler relies on KVList. plugin-apps stores all of the app keys in its keyspace (session and ouath2 tokens), so the list is going to be huge and KVList slow.

2/5 consider mattermost/mattermost#21653

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reference: cluster.JobOnceScheduler calls KVList on Start() and every 5 minutes.

Copy link
Contributor Author

@hanzei hanzei Nov 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please elaborate on how a solution that uses mattermost/mattermost#21653 would work?

I would assume only the cluster leader executes timers, correct? Once a plugin instance receives the request to create a new timer, it needs to communicate that information to the leader. How would that work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've talked with @levb offline. The solutions in #375 (comment) are the path forward, but require the cache store from #458.

caller Caller

conf config.Service
log utils.Logger
}

var _ Service = (*AppServices)(nil)

func NewService(store *store.Service) *AppServices {
return &AppServices{
store: store,
// SetCaller must be called before calling any other methods of AppsServies.
// TODO: Remove this uggly hack.
func (a *AppServices) SetCaller(caller Caller) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we have a dummy function in the NewService set to something 'default' as a Caller, so we could set it up later when needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would that default Caller look like?

Copy link
Contributor

@javaguirre javaguirre Nov 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem is not creating a default for Caller, but AppServices doing too much, why not having different services providing their own business logic? I don't think a subscription app service would need any config to create a timer, right?

I need to think more about it, but if you want we can talk so I can be more helpful.

Copy link
Contributor

@javaguirre javaguirre Nov 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like this is what I meant (I did it fast, so it's not accurate), let me know what you think and why/why not do this (I believe a decision was made before this PR, so I'm interested) :-)

// Copyright (c) 2020-present Mattermost, Inc. All Rights Reserved.
// See License for license information.

package appservices

import (
	"github.com/pkg/errors"

	"github.com/mattermost/mattermost-plugin-api/cluster"

	"github.com/mattermost/mattermost-plugin-apps/apps"
	"github.com/mattermost/mattermost-plugin-apps/server/config"
	"github.com/mattermost/mattermost-plugin-apps/server/incoming"
	"github.com/mattermost/mattermost-plugin-apps/server/store"
	"github.com/mattermost/mattermost-plugin-apps/utils"
)

type AppSubscriptionService interface {
	// Subscriptions

	Subscribe(*incoming.Request, apps.Subscription) error
	GetSubscriptions(*incoming.Request) ([]apps.Subscription, error)
	Unsubscribe(*incoming.Request, apps.Event) error
	UnsubscribeApp(*incoming.Request, apps.AppID) error
}

type AppSubscriptionService interface {
	// Timer

	CreateTimer(*incoming.Request, apps.Timer) error
}

type AppKVService interface {
    // KV

	KVSet(_ *incoming.Request, prefix, id string, data []byte) (bool, error)
	KVGet(_ *incoming.Request, prefix, id string) ([]byte, error)
	KVDelete(_ *incoming.Request, prefix, id string) error
	KVList(_ *incoming.Request, namespace string, processf func(key string) error) error
	KVDebugInfo(*incoming.Request) (*store.KVDebugInfo, error)
	KVDebugAppInfo(*incoming.Request, apps.AppID) (*store.KVDebugAppInfo, error)


}
	
type AppOAuthService interface {
    	// Remote (3rd party) OAuth2
	StoreOAuth2App(_ *incoming.Request, data []byte) error
	StoreOAuth2User(_ *incoming.Request, data []byte) error
	GetOAuth2User(_ *incoming.Request) ([]byte, error)
}

type Caller interface {
	InvokeCall(*incoming.Request, apps.CallRequest) (*apps.App, apps.CallResponse)
	NewIncomingRequest() *incoming.Request
}

type AppService struct {
	store     *store.Service


	conf config.Service
	log  utils.Logger
}

type AppTimerService struct {
    service *AppService
    
    scheduler *cluster.JobOnceScheduler
	caller    Caller
}

type AppSubscriptionService struct {
    service *AppService
}

a.caller = caller
}

func NewService(log utils.Logger, confService config.Service, store *store.Service, scheduler *cluster.JobOnceScheduler) (*AppServices, error) {
service := &AppServices{
store: store,
scheduler: scheduler,
conf: confService,
log: log,
}

err := scheduler.SetCallback(service.ExecuteTimer)
if err != nil {
return nil, errors.Wrap(err, "failed to set timer callback")
}

err = scheduler.Start()
if err != nil {
return nil, errors.Wrap(err, "failed to start timer scheduler")
}

return service, nil
}
109 changes: 109 additions & 0 deletions server/appservices/timer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright (c) 2020-present Mattermost, Inc. All Rights Reserved.
// See License for license information.

package appservices

import (
"context"
"strconv"
"time"

"github.com/pkg/errors"

"github.com/mattermost/mattermost-plugin-apps/apps"
"github.com/mattermost/mattermost-plugin-apps/server/config"
"github.com/mattermost/mattermost-plugin-apps/server/incoming"
)

type storedTimer struct {
Call apps.Call `json:"call"`
AppID apps.AppID `json:"app_id"`
UserID string `json:"user_id"`
ChannelID string `json:"channel_id,omitempty"`
TeamID string `json:"team_id,omitempty"`
}

func (t storedTimer) Key(appID apps.AppID, at int64) string {
return string(appID) + t.UserID + strconv.FormatInt(at, 10)
}

func (t storedTimer) Loggable() []interface{} {
props := []interface{}{"user_id", t.UserID}
props = append(props, "app_id", t.AppID)
if t.ChannelID != "" {
props = append(props, "call_team_id", t.TeamID)
}
if t.TeamID != "" {
props = append(props, "call_channel_id", t.ChannelID)
}
return props
}

func (a *AppServices) CreateTimer(r *incoming.Request, t apps.Timer) error {
err := r.Check(
r.RequireActingUser,
r.RequireSourceApp,
t.Validate,
)
if err != nil {
return err
}

st := storedTimer{
Call: t.Call,
AppID: r.SourceAppID(),
UserID: r.ActingUserID(),
ChannelID: t.ChannelID,
TeamID: t.TeamID,
}

_, err = a.scheduler.ScheduleOnce(st.Key(r.SourceAppID(), t.At), time.UnixMilli(t.At), st)
if err != nil {
return errors.Wrap(err, "faild to schedule timer job")
}

return nil
}

func (a *AppServices) ExecuteTimer(key string, props interface{}) {
t, ok := props.(storedTimer)
if !ok {
a.log.Debugw("Timer contained unknown props. Inoring the timer.", "key", key, "props", props)
return
}

r := a.caller.NewIncomingRequest()

r.Log = r.Log.With(t)

ctx, cancel := context.WithTimeout(context.Background(), config.RequestTimeout)
defer cancel()
r = r.WithCtx(ctx)

r = r.WithDestination(t.AppID)
r = r.WithActingUserID(t.UserID)

context := &apps.Context{
UserAgentContext: apps.UserAgentContext{
AppID: t.AppID,
TeamID: t.TeamID,
ChannelID: t.ChannelID,
},
}

creq := apps.CallRequest{
Call: t.Call,
Context: *context,
}
r.Log = r.Log.With(creq)
_, cresp := a.caller.InvokeCall(r, creq)
if cresp.Type == apps.CallResponseTypeError {
if a.conf.Get().DeveloperMode {
r.Log.WithError(cresp).Errorf("Timer execute failed")
}
return
}
r.Log = r.Log.With(cresp)

r.Log.Debugf("Timer executed")
}
1 change: 1 addition & 0 deletions server/httpin/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func NewService(proxy proxy.Service, appservices appservices.Service, conf confi
h.HandleFunc(path.Subscribe, h.GetSubscriptions).Methods(http.MethodGet)
h.HandleFunc(path.Subscribe, h.Subscribe).Methods(http.MethodPost)
h.HandleFunc(path.Unsubscribe, h.Unsubscribe).Methods(http.MethodPost)
h.HandleFunc(path.TimerCreate, h.CreateTimer).Methods(http.MethodPost)

// Admin API, can be used by plugins, external services, or the user agent.
h.HandleFunc(path.DisableApp, h.DisableApp).Methods(http.MethodPost)
Expand Down
35 changes: 35 additions & 0 deletions server/httpin/timer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) 2020-present Mattermost, Inc. All Rights Reserved.
// See License for license information.

package httpin

import (
"encoding/json"
"net/http"

"github.com/mattermost/mattermost-plugin-apps/apps"
"github.com/mattermost/mattermost-plugin-apps/server/incoming"
"github.com/mattermost/mattermost-plugin-apps/utils/httputils"
)

// CreateTimer create or updates a new statefull timer.
//
// Path: /api/v1/timer
// Method: POST
// Input: JSON {at, call, state}
// Output: None
func (s *Service) CreateTimer(r *incoming.Request, w http.ResponseWriter, req *http.Request) {
var t apps.Timer

err := json.NewDecoder(req.Body).Decode(&t)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

err = s.AppServices.CreateTimer(r, t)
if err != nil {
http.Error(w, err.Error(), httputils.ErrorToStatus(err))
return
}
}
10 changes: 9 additions & 1 deletion server/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,13 @@ func (p *Plugin) OnActivate() (err error) {
return errors.Wrap(err, "failed to initialize persistent store")
}
p.store.App.InitBuiltin(builtin.App(conf))
p.appservices = appservices.NewService(p.store)
scheduler := cluster.GetJobOnceScheduler(p.API)
appservice, err := appservices.NewService(log, p.conf, p.store, scheduler)
if err != nil {
return errors.Wrapf(err, "failed to initialize appservices")
}
p.appservices = appservice

p.sessionService = session.NewService(mm, p.store)
log.Debugf("initialized API and persistent store")

Expand All @@ -110,6 +116,7 @@ func (p *Plugin) OnActivate() (err error) {
if err != nil {
return errors.Wrapf(err, "failed creating cluster mutex")
}

p.proxy = proxy.NewService(p.conf, p.store, mutex, p.httpOut, p.sessionService, p.appservices)
err = p.proxy.Configure(conf, log)
if err != nil {
Expand All @@ -121,6 +128,7 @@ func (p *Plugin) OnActivate() (err error) {
)
log.Debugf("initialized the app proxy")

appservice.SetCaller(p.proxy)
p.httpIn = httpin.NewService(p.proxy, p.appservices, p.conf)
log.Debugf("initialized incoming HTTP")

Expand Down
1 change: 1 addition & 0 deletions server/proxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/mattermost/mattermost-plugin-apps/apps"
"github.com/mattermost/mattermost-plugin-apps/apps/appclient"

"github.com/mattermost/mattermost-plugin-apps/server/appservices"
"github.com/mattermost/mattermost-plugin-apps/server/config"
"github.com/mattermost/mattermost-plugin-apps/server/httpout"
Expand Down
Loading