-
Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathdispatcher.go
229 lines (196 loc) · 5.93 KB
/
dispatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
/*
* Echotron
* Copyright (C) 2018 The Echotron Contributors
*
* Echotron is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Echotron is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package echotron
import (
"compress/gzip"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/url"
"sync"
)
type smap struct {
m sync.Map
}
func (s *smap) load(id int64) (Bot, bool) {
bot, ok := s.m.Load(id)
if ok {
return bot.(Bot), ok
}
return nil, ok
}
func (s *smap) store(id int64, bot Bot) {
s.m.Store(id, bot)
}
func (s *smap) delete(id int64) {
s.m.Delete(id)
}
// Bot is the interface that must be implemented by your definition of
// the struct thus it represent each open session with a user on Telegram.
type Bot interface {
// Update will be called upon receiving any update from Telegram.
Update(*Update)
}
// NewBotFn is called every time echotron receives an update with a chat ID never
// encountered before.
type NewBotFn func(chatId int64) Bot
// The Dispatcher passes the updates from the Telegram Bot API to the Bot instance
// associated with each chatID. When a new chat ID is found, the provided function
// of type NewBotFn will be called.
type Dispatcher struct {
// sessions map[int64]Bot
sessions smap
newBot NewBotFn
updates chan *Update
httpServer *http.Server
api API
}
// NewDispatcher returns a new instance of the Dispatcher object.
// Calls the Update function of the bot associated with each chat ID.
// If a new chat ID is found, newBotFn will be called first.
func NewDispatcher(token string, newBotFn NewBotFn) *Dispatcher {
d := &Dispatcher{
api: NewAPI(token),
newBot: newBotFn,
updates: make(chan *Update),
}
go d.listen()
return d
}
// DelSession deletes the Bot instance, seen as a session, from the
// map with all of them.
func (d *Dispatcher) DelSession(chatID int64) {
d.sessions.delete(chatID)
}
// AddSession allows to arbitrarily create a new Bot instance.
func (d *Dispatcher) AddSession(chatID int64) {
d.sessions.store(chatID, d.newBot(chatID))
}
// Poll is a wrapper function for PollOptions.
func (d *Dispatcher) Poll() error {
return d.PollOptions(true, UpdateOptions{Timeout: 120})
}
// PollOptions starts the polling loop so that the dispatcher calls the function Update
// upon receiving any update from Telegram.
func (d *Dispatcher) PollOptions(dropPendingUpdates bool, opts UpdateOptions) error {
var (
timeout = opts.Timeout
isFirstRun = true
)
// deletes webhook if present to run in long polling mode
if _, err := d.api.DeleteWebhook(dropPendingUpdates); err != nil {
return err
}
for {
if isFirstRun {
opts.Timeout = 0
}
response, err := d.api.GetUpdates(&opts)
if err != nil {
return err
}
if !dropPendingUpdates || !isFirstRun {
for _, u := range response.Result {
d.updates <- u
}
}
if l := len(response.Result); l > 0 {
opts.Offset = response.Result[l-1].ID + 1
}
if isFirstRun {
isFirstRun = false
opts.Timeout = timeout
}
}
}
func (d *Dispatcher) instance(chatID int64) Bot {
bot, ok := d.sessions.load(chatID)
if !ok {
bot = d.newBot(chatID)
d.sessions.store(chatID, bot)
}
return bot
}
func (d *Dispatcher) listen() {
for update := range d.updates {
bot := d.instance(update.ChatID())
go bot.Update(update)
}
}
// ListenWebhook is a wrapper function for ListenWebhookOptions.
func (d *Dispatcher) ListenWebhook(webhookURL string) error {
return d.ListenWebhookOptions(webhookURL, false, nil)
}
// ListenWebhookOptions sets a webhook and listens for incoming updates.
// The webhookUrl should be provided in the following format: '<hostname>:<port>/<path>',
// eg: 'https://example.com:443/bot_token'.
// ListenWebhook will then proceed to communicate the webhook url '<hostname>/<path>' to Telegram
// and run a webserver that listens to ':<port>' and handles the path.
func (d *Dispatcher) ListenWebhookOptions(webhookURL string, dropPendingUpdates bool, opts *WebhookOptions) error {
u, err := url.Parse(webhookURL)
if err != nil {
return err
}
whURL := fmt.Sprintf("%s%s", u.Hostname(), u.EscapedPath())
if _, err = d.api.SetWebhook(whURL, dropPendingUpdates, opts); err != nil {
return err
}
if d.httpServer != nil {
mux := http.NewServeMux()
mux.Handle("/", d.httpServer.Handler)
mux.HandleFunc(u.EscapedPath(), d.HandleWebhook)
d.httpServer.Handler = mux
return d.httpServer.ListenAndServe()
}
http.HandleFunc(u.EscapedPath(), d.HandleWebhook)
return http.ListenAndServe(fmt.Sprintf(":%s", u.Port()), nil)
}
// SetHTTPServer allows to set a custom http.Server for ListenWebhook and ListenWebhookOptions.
func (d *Dispatcher) SetHTTPServer(s *http.Server) {
d.httpServer = s
}
// HandleWebhook is the http.HandlerFunc for the webhook URL.
// Useful if you've already a http server running and want to handle the request yourself.
func (d *Dispatcher) HandleWebhook(w http.ResponseWriter, r *http.Request) {
var update Update
jsn, err := readRequest(r)
if err != nil {
log.Println("echotron.Dispatcher", "HandleWebhook", err)
return
}
if err := json.Unmarshal(jsn, &update); err != nil {
log.Println("echotron.Dispatcher", "HandleWebhook", err)
return
}
d.updates <- &update
}
func readRequest(r *http.Request) ([]byte, error) {
switch r.Header.Get("Content-Encoding") {
case "gzip":
reader, err := gzip.NewReader(r.Body)
if err != nil {
return []byte{}, err
}
defer reader.Close()
return io.ReadAll(reader)
default:
return io.ReadAll(r.Body)
}
}