Merge branch 'master' into DEV-3105-load-test-alerting

This commit is contained in:
morestatic 2023-08-23 12:07:00 +07:00
commit aac89a2fb6
12 changed files with 257 additions and 35 deletions

View File

@ -0,0 +1,87 @@
---
title: "Fine-tuning the Rport Server: Understanding `max_concurrent_ssh_handshake`"
weight: 30
slug: "fine-tuing-rport-server"
---
{{< toc >}}
The `max_concurrent_ssh_handshake` parameter is a pivotal configuration detail in the rport server.
Here, we delve deep into what it represents and its implications, especially when dealing with a large client base.
---
## What is `max_concurrent_ssh_handshake`?
The `max_concurrent_ssh_handshake` parameter in the rport server was introduced as a defensive measure against
the "thundering herd" effect. This effect becomes notably pronounced post server downtimes,
when a deluge of clients try to reestablish their SSH connections simultaneously.
Handling numerous connection attempts and the extensive data associated with each handshake
can heavily tax both the CPU and network bandwidth, frequently leading to subsequent connection timeouts.
Through capping concurrent handshakes,
this parameter aspires to optimize server resource allocation and guarantee a more streamlined reconnection process.
---
## Addressing User Queries
### 1. Can this be set to "No limit" for scalable environments?
Setting this parameter to "No limit" isn't viable. Without a stipulated cap,
the server risks undue strain, notably during peak reconnection periods.
### 2. What are the effects of raising this parameter?
A loftier `max_concurrent_ssh_handshake` value allows a greater number of simultaneous SSH handshake processes.
However, it's crucial to remain wary of potential server resource saturation, particularly during mass reconnections.
### 3. How should we scale?
- **Infrastructure:** Ensure your server resources
— CPU, memory, and network — are in sync with expected peak loads, especially post-downtime.
- **Client Strategy:** Pre-configured clients come with a growing backoff mechanism,
aiding the server during peak times. This feature should inform the server's scaling strategy.
- **Tuning:**
- Modify the `max_concurrent_ssh_handshake` based on past data,
anticipated load patterns, and server performance metrics post-downtimes.
- **Binary Search Tuning**: Begin with the total client count and methodically halve the `max_concurrent_ssh_handshake` value until a stable configuration is pinpointed.
- **Number of cores**: from our experimentation one of the limits was CPU and in this scenario
we found the total number of cores divided by 2 to yield most stable results.
### 4. What happens if we set the baseline to 100?
Setting the `max_concurrent_ssh_handshake` to a value like 100 caps the server
to processing a maximum of 100 concurrent SSH handshakes.
However, there are associated cascading implications:
- **CPU utilization**: SSH handshakes are CPU intensive and increasing this value to 100 with only 2 slow cores will
cause a situation in which all 100 handshakes compete for CPU time and take so long to process that they all timeout and server can't establish any connection.
While on 256 core machine it would be conservative setting.
- **Connection Queueing**: A low threshold can result in many clients queueing up, leading to prolonged waits.
- **Client Timeouts**: Protracted waits can cause client-side timeouts,
notably problematic if most clients are attempting simultaneous reconnections.
- **Exponential Backoff**: Given the client-side exponential backoff strategy,
timeouts can lead to elongated durations before subsequent reconnection attempts.
Consequently, a lower handshake limit can result in extended timeframes
before all clients manage successful reconnections.
---
## Additional Recommendations
- **Planning for Downtimes:** Forewarn of upcoming server downtimes,
encourage staggered client reconnections, or consider rolling restarts to alleviate the thundering herd effect.
- **Monitoring:** Monitor essential metrics like CPU usage, network bandwidth,
and connection success ratios, with a keen eye on post-downtime scenarios.
- **Testing:** Create controlled test environments to simulate post-downtime connection dynamics,
ensuring optimal live configurations.
- **Feedback Loop:** Create alert mechanisms for potential resource overconsumption instances,
such as CPU spikes or bandwidth bottlenecks, ensuring timely interventions.

View File

@ -39,7 +39,7 @@ type Config struct {
}
type Service interface {
Run(ctx context.Context, notificationDispatcher notifications.Dispatcher, maxWorkers int)
Run(ctx context.Context, scriptsDir string, notificationDispatcher notifications.Dispatcher, maxWorkers int)
Stop() (err error)
LoadDefaultRuleSet() (err error)
@ -60,4 +60,6 @@ type Service interface {
SetProblemActive(pid rules.ProblemID) (err error)
SetProblemResolved(pid rules.ProblemID, resolvedAt time.Time) (err error)
GetLatestProblems(limit int) (problems []*rules.Problem, err error)
GetSampleData(choice string) (sampleData *rundata.SampleData, err error)
}

View File

@ -98,6 +98,16 @@ func newTestTemplates() map[templates.TemplateID]templates.Template {
Body: "The client with ID: {{.Client.ID}} has triggered rule ID: {{.Rule.ID}} BODY2",
HTML: true,
Recipients: []string{"t3@test.com", "t4@test.com"},
ScriptDataTemplates: &templates.ScriptDataTemplates{
Subject: "{{.Outcome}} for {{.Rule.ID}} SUBJECT2",
Severity: "{{.Rule.Severity}}",
Client: "{{.Client.ID}}",
WebhookURL: "https://test.com/rules/{{.Rule.ID}}",
Custom: templates.CustomData{
"key1": "value1",
"key2": "value2",
},
},
},
"t3": {
ID: "t3",
@ -180,7 +190,7 @@ func NewMockServiceProvider() (mp *MockServiceProvider) {
return mp
}
func (mp *MockServiceProvider) Run(_ context.Context, _ notifications.Dispatcher, _ int) {
func (mp *MockServiceProvider) Run(_ context.Context, _ string, _ notifications.Dispatcher, _ int) {
}
func (mp *MockServiceProvider) Stop() (err error) {
@ -292,3 +302,18 @@ func (mp *MockServiceProvider) GetLatestProblems(_ int) (problems []*rules.Probl
})
return problems, nil
}
func (mp *MockServiceProvider) GetSampleData(choice string) (sampleData *rundata.SampleData, err error) {
testRunData := rundata.SampleData{
CL: []clientupdates.Client{{ID: "linux"}},
M: []measures.Measure{{ClientID: "linux"}},
}
if choice == "windows" {
testRunData = rundata.SampleData{
CL: []clientupdates.Client{{ID: "windows"}},
M: []measures.Measure{{ClientID: "windows"}},
}
}
return &testRunData, nil
}

View File

@ -18,6 +18,13 @@ type RunData struct {
WaitMilliSecs int `json:"delay_ms"`
}
type RecordingStatus int
type SampleData struct {
CL []clientupdates.Client `json:"client_data"`
M []measures.Measure `json:"measurements"`
}
type NotificationResult struct {
RefID refs.Identifiable `json:"ref_id"`
Notification notifications.NotificationData `json:"notification"`

View File

@ -17,15 +17,21 @@ var (
ErrMissingScriptSubjectMsg = "missing data subject"
ErrBadlyFormedWebhookMsg = "badly formed webhook"
ErrMissingWebhookURLHostMsg = "missing host in webhook url"
ErrScriptNotFoundMsg = "script %s not found in %s"
ErrFailedToStatScriptFile = "failed to stat script file %s"
ErrScriptNotExecutableMsg = "script %s not executable"
)
type TemplateID string
type CustomData map[string]string
type ScriptDataTemplates struct {
Subject string `json:"subject"`
Severity string `json:"severity"`
Client string `json:"client"`
WebhookURL string `json:"webhook_url"`
Subject string `json:"subject"`
Severity string `json:"severity"`
Client string `json:"client"`
WebhookURL string `json:"webhook_url"`
Custom CustomData `json:"custom_data"`
}
type Template struct {

View File

@ -124,9 +124,11 @@
## This is a performance enhancement. Do not turn off, unless you have good reasons.
#sqlite_wal = true
## Limits the number of ssh handshakes that the server will handle concurrently. Too many in progress SSH handshakes
## together will slow down the server's ability to perform other work. This can particularly impact server startup
## when many clients connect at similar times. A very slow server can also result in strange client reconnect issues.
## Limits the concurrent SSH handshakes to prevent resource strain, especially during mass reconnections after downtimes.
## A low value can lead to client timeouts and extended reconnect durations due to exponential backoff.
## However, setting a value too high might overburden the server's CPU and network bandwidth during peak connection periods.
## Adjust based on server resources, anticipated client load, and post-downtime performance.
## For comprehensive insights and guidance, refer to https://oss.rport.io/advanced/fine-tuning-the-rport-server/.
## Default is 4.
#max_concurrent_ssh_handshakes = 4

View File

@ -4,10 +4,13 @@ import (
"context"
"net/http"
"github.com/gorilla/mux"
rportplus "github.com/realvnc-labs/rport/plus"
alertingcap "github.com/realvnc-labs/rport/plus/capabilities/alerting"
"github.com/realvnc-labs/rport/plus/capabilities/alerting/entities/rundata"
"github.com/realvnc-labs/rport/server/api"
"github.com/realvnc-labs/rport/server/routes"
)
func (al *APIListener) getAlertingCapability() (capEx alertingcap.CapabilityEx, statusCode int, err error) {
@ -28,6 +31,7 @@ func (al *APIListener) handleTestRules(w http.ResponseWriter, r *http.Request) {
asCap, status, err := al.getAlertingCapability()
if err != nil {
al.jsonErrorResponse(w, status, err)
return
}
runData := rundata.RunData{}
@ -56,3 +60,24 @@ func (al *APIListener) handleTestRules(w http.ResponseWriter, r *http.Request) {
al.writeJSONResponse(w, http.StatusOK, response)
}
func (al *APIListener) handleGetSampleData(w http.ResponseWriter, r *http.Request) {
as, status, err := al.getAlertingService()
if err != nil {
al.jsonErrorResponse(w, status, err)
return
}
vars := mux.Vars(r)
choice := vars[routes.ParamSampleDataChoice]
sampleData, err := as.GetSampleData(choice)
if err != nil {
al.jsonErrorResponse(w, status, err)
return
}
response := api.NewSuccessPayload(sampleData)
al.writeJSONResponse(w, http.StatusOK, response)
}

View File

@ -15,6 +15,7 @@ import (
alertingcap "github.com/realvnc-labs/rport/plus/capabilities/alerting"
"github.com/realvnc-labs/rport/plus/capabilities/alerting/alertingmock"
"github.com/realvnc-labs/rport/plus/capabilities/alerting/entities/rules"
"github.com/realvnc-labs/rport/plus/capabilities/alerting/entities/rundata"
"github.com/realvnc-labs/rport/plus/capabilities/alerting/entities/templates"
"github.com/realvnc-labs/rport/server/api/authorization"
"github.com/realvnc-labs/rport/server/api/users"
@ -50,6 +51,10 @@ type ProblemsResponse struct {
Data []*rules.Problem
}
type SampleDataResponse struct {
Data *rundata.SampleData
}
type plusManagerForMockAlerting struct {
cap map[string]rportplus.Capability
@ -287,17 +292,17 @@ func TestShouldReturnTemplate(t *testing.T) {
func TestShouldSaveTemplate(t *testing.T) {
al, mockAS := setup(t)
t1, err := mockAS.GetTemplate("t1")
t2, err := mockAS.GetTemplate("t2")
require.NoError(t, err)
t10 := *t1
t10.ID = ""
t20 := *t2
t20.ID = ""
t10JSON, err := json.Marshal(t10)
t20JSON, err := json.Marshal(t20)
require.NoError(t, err)
w := httptest.NewRecorder()
req := httptest.NewRequest("PUT", routes.AllRoutesPrefix+routes.AlertingServiceRoutesPrefix+routes.ASTemplatesRoute+"/t10", bytes.NewReader(t10JSON))
req := httptest.NewRequest("PUT", routes.AllRoutesPrefix+routes.AlertingServiceRoutesPrefix+routes.ASTemplatesRoute+"/t20", bytes.NewReader(t20JSON))
al.router.ServeHTTP(w, req)
@ -308,10 +313,16 @@ func TestShouldSaveTemplate(t *testing.T) {
t.Errorf("Expected status code %d, got %d", http.StatusOK, res.StatusCode)
}
savedTemplate, ok := mockAS.Templates["t10"]
savedTemplate, ok := mockAS.Templates["t20"]
require.True(t, ok)
assert.Equal(t, templates.TemplateID("t10"), savedTemplate.ID)
assert.Equal(t, templates.TemplateID("t20"), savedTemplate.ID)
assert.Equal(t, t20.Transport, savedTemplate.Transport)
assert.Equal(t, t20.Subject, savedTemplate.Subject)
assert.Equal(t, t20.Body, savedTemplate.Body)
assert.Equal(t, t20.HTML, savedTemplate.HTML)
assert.Equal(t, t20.ScriptDataTemplates, savedTemplate.ScriptDataTemplates)
assert.Equal(t, t20.Recipients, savedTemplate.Recipients)
}
func TestShouldDeleteTemplate(t *testing.T) {
@ -617,3 +628,57 @@ func TestShouldGetLatestProblemsWithSort(t *testing.T) {
assert.Equal(t, rules.RuleID("r1"), problemsInfo.Data[1].RuleID)
assert.Equal(t, rules.RuleID("r1"), problemsInfo.Data[2].RuleID)
}
func getSampleDataInfo(t *testing.T, w *httptest.ResponseRecorder) (sampleDataInfo SampleDataResponse) {
err := json.NewDecoder(w.Body).Decode(&sampleDataInfo)
assert.NoError(t, err)
return sampleDataInfo
}
func TestShouldGetSampleDataWindows(t *testing.T) {
al, _ := setup(t)
w := httptest.NewRecorder()
req := httptest.NewRequest("GET",
routes.AllRoutesPrefix+routes.AlertingServiceRoutesPrefix+routes.ASRuleSetRoute+routes.ASSampleDataRoute+"/windows",
nil)
al.router.ServeHTTP(w, req)
res := w.Result()
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
require.Equal(t, http.StatusOK, res.StatusCode)
}
sampleDataInfo := getSampleDataInfo(t, w)
assert.NotZero(t, sampleDataInfo.Data)
sampleData := sampleDataInfo.Data
assert.Equal(t, "windows", sampleData.CL[0].ID)
assert.Equal(t, "windows", sampleData.M[0].ClientID)
}
func TestShouldGetSampleDataLinux(t *testing.T) {
al, _ := setup(t)
w := httptest.NewRecorder()
req := httptest.NewRequest("GET",
routes.AllRoutesPrefix+routes.AlertingServiceRoutesPrefix+routes.ASRuleSetRoute+routes.ASSampleDataRoute+"/linux",
nil)
al.router.ServeHTTP(w, req)
res := w.Result()
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
require.Equal(t, http.StatusOK, res.StatusCode)
}
sampleDataInfo := getSampleDataInfo(t, w)
assert.NotZero(t, sampleDataInfo.Data)
sampleData := sampleDataInfo.Data
assert.Equal(t, "linux", sampleData.CL[0].ID)
assert.Equal(t, "linux", sampleData.M[0].ClientID)
}

View File

@ -245,6 +245,7 @@ func (al *APIListener) initRouter() {
secureASRouter.Handle(routes.ASTemplatesRoute+"/{"+routes.ParamTemplateID+"}", al.wrapAdminAccessMiddleware(http.HandlerFunc(al.handleSaveTemplate))).Methods(http.MethodPut)
secureASRouter.Handle(routes.ASRuleSetRoute+routes.ASRunTestRulesRoute, al.wrapAdminAccessMiddleware(http.HandlerFunc(al.handleTestRules))).Methods(http.MethodPut)
secureASRouter.Handle(routes.ASRuleSetRoute+routes.ASSampleDataRoute+"/{"+routes.ParamSampleDataChoice+"}", al.wrapAdminAccessMiddleware(http.HandlerFunc(al.handleGetSampleData))).Methods(http.MethodGet)
}
if rportplus.IsPlusOAuthEnabled(al.config.PlusConfig) {

View File

@ -605,9 +605,10 @@ func (cl *ClientListener) handleSSHRequests(clientLog *logger.DynamicLogger, cli
clientLog.Errorf("Failed to unmarshal save_measurement: %s", err)
continue
}
measurement.ClientID = clientID
measurement.ClientID = clientID
measurement.Timestamp = time.Now().UTC()
cl.server.monitoringModule.Notify(measurement)
if rportplus.IsPlusEnabled(cl.server.config.PlusConfig) {

View File

@ -1,20 +1,21 @@
package routes
const (
ParamClientID = "client_id"
ParamClientAuthID = "client_auth_id"
ParamUserID = "user_id"
ParamSessionID = "session_id"
ParamJobID = "job_id"
ParamGroupID = "group_id"
ParamTokenPrefix = "prefix"
ParamVaultValueID = "vault_value_id"
ParamScriptValueID = "script_value_id"
ParamCommandValueID = "command_value_id"
ParamGraphName = "graph_name"
ParamTemplateID = "template_id"
ParamProblemID = "problem_id"
ParamNotificationID = "notification_id"
ParamClientID = "client_id"
ParamClientAuthID = "client_auth_id"
ParamUserID = "user_id"
ParamSessionID = "session_id"
ParamJobID = "job_id"
ParamGroupID = "group_id"
ParamTokenPrefix = "prefix"
ParamVaultValueID = "vault_value_id"
ParamScriptValueID = "script_value_id"
ParamCommandValueID = "command_value_id"
ParamGraphName = "graph_name"
ParamTemplateID = "template_id"
ParamProblemID = "problem_id"
ParamNotificationID = "notification_id"
ParamSampleDataChoice = "sample_data_choice"
AllRoutesPrefix = "/api/v1"
AuthRoutesPrefix = "/auth"
@ -26,6 +27,7 @@ const (
ASTemplatesRoute = "/notification-templates"
ASProblemsRoute = "/problems"
ASRunTestRulesRoute = "/test"
ASSampleDataRoute = "/sample-data"
TotPRoutes = "/me/totp-secret"
Verify2FaRoute = "/verify-2fa"
FilesUploadRouteName = "files"

View File

@ -122,7 +122,7 @@ func NewServer(ctx context.Context, config *chconfig.Config, opts *ServerOpts) (
alertingCap := s.plusManager.GetAlertingCapabilityEx()
if alertingCap != nil {
s.alertingService, err = s.StartPlusAlertingService(ctx, alertingCap, config.Server.DataDir)
s.alertingService, err = s.StartPlusAlertingService(alertingCap, config.Server.DataDir)
if err != nil {
return nil, err
}
@ -298,7 +298,7 @@ func NewServer(ctx context.Context, config *chconfig.Config, opts *ServerOpts) (
if s.alertingService != nil {
dispatcher := notifications.NewDispatcher(s.apiListener.notificationsStorage)
s.alertingService.Run(ctx, dispatcher, maxAlertingWorkers)
s.alertingService.Run(ctx, config.Notifications.NotificationScriptDir, dispatcher, maxAlertingWorkers)
}
return s, nil
}
@ -311,8 +311,7 @@ func (s *Server) HandlePlusLicenseInfoAvailable() {
}
}
func (s *Server) StartPlusAlertingService(ctx context.Context,
alertingCap alertingcap.CapabilityEx,
func (s *Server) StartPlusAlertingService(alertingCap alertingcap.CapabilityEx,
dataDir string) (as alertingcap.Service, err error) {
opts := bbolt.DefaultOptions
bdb, err := bbolt.Open(dataDir+"/alerts.boltdb", 0600, opts)