Implement connection establishment via web sockets

This commit is contained in:
Martin 2020-07-05 23:53:06 +02:00
parent b903f584e0
commit d9bdad3326
12 changed files with 104 additions and 136 deletions

View File

@ -1,3 +1,5 @@
#pragma once
#include "CustomClient.h"
#include "Object.h"

View File

@ -40,6 +40,7 @@ const int HTTP_STATE_CONTENT=3;
const int HTTP_STATE_WAIT_FOR_THREAD=4;
const int HTTP_STATE_KEEPALIVE=5;
const int HTTP_STATE_DONE=6;
const int HTTP_STATE_WEBSOCKET = 7;
const int HTTP_MAX_KEEPALIVE=15000;
@ -67,6 +68,16 @@ void CHTTPClient::init_mutex(void)
void CHTTPClient::destroy_mutex(void)
{
Server->destroy(share_mutex);
}
bool CHTTPClient::wantReceive(void)
{
return http_g_state != HTTP_STATE_WEBSOCKET;
}
bool CHTTPClient::closeSocket(void)
{
return http_g_state != HTTP_STATE_WEBSOCKET;
}
void CHTTPClient::ReceivePackets(IRunOtherCallback* run_other)
@ -99,7 +110,9 @@ void CHTTPClient::ReceivePackets(IRunOtherCallback* run_other)
{
if( processRequest() )
{
http_g_state=HTTP_STATE_WAIT_FOR_THREAD;
if (http_g_state != HTTP_STATE_WEBSOCKET)
http_g_state = HTTP_STATE_WAIT_FOR_THREAD;
}
else
do_quit=true;
@ -117,6 +130,11 @@ void CHTTPClient::ReceivePackets(IRunOtherCallback* run_other)
bool CHTTPClient::Run(IRunOtherCallback* run_other)
{
if (http_g_state == HTTP_STATE_WEBSOCKET)
{
return false;
}
if( http_g_state==HTTP_STATE_WAIT_FOR_THREAD )
{
if( !Server->getThreadPool()->isRunning(request_ticket) )
@ -517,6 +535,7 @@ bool CHTTPClient::processRequest(void)
CHTTPSocket* socket_handler = new CHTTPSocket(name, gparams, http_params, pipe, endpoint);
request_ticket = Server->getThreadPool()->execute(socket_handler, "http websocket");
request_handler = socket_handler;
http_g_state = HTTP_STATE_WEBSOCKET;
return true;
}
else if( pl->size()>1 && (*pl)[0]=='x' && (*pl)[1]=='?' )

View File

@ -28,6 +28,9 @@ public:
static void init_mutex(void);
static void destroy_mutex(void);
virtual bool wantReceive(void);
virtual bool closeSocket(void);
private:
inline void processCommand(char ch);

View File

@ -14,19 +14,14 @@ void CHTTPSocket::operator()()
ParseParamStrHttp(gparams, &GET, true);
THREAD_ID tid = 0;
try
{
tid = Server->ExecuteWebSocket(name, GET, RawPARAMS, output, endpoint_name);
}
catch (...)
{
return;
}
tid = Server->ExecuteWebSocket(name, GET, RawPARAMS, output, endpoint_name);
if (tid == ILLEGAL_THREAD_ID)
{
std::string error = "Error: Unknown web socket [" + EscapeHTML(name) + "]";
Server->Log(error, LL_WARNING);
output->Write("Content-type: text/html; charset=UTF-8\r\n\r\n" + error);
delete output;
}
}

View File

@ -1192,6 +1192,11 @@ IPipe * InternetClient::connect(const SServerConnectionSettings & selected_serve
hostname = getuntil(":", hostname);
}
else if (hostname.find("/") != std::string::npos)
{
loc = getafter("/", hostname);
hostname = getuntil("/", hostname);
}
if (loc.empty() ||
loc[0] != '/')
@ -1214,7 +1219,7 @@ IPipe * InternetClient::connect(const SServerConnectionSettings & selected_serve
cs->Write(std::string("GET ")+loc+" HTTP/1.1\r\n"
"Host: "+hostname+"\r\n"
"Upgrade: websocket\r\n"
"Connection: upgrade\r\n"
"Connection: Upgrade\r\n"
"Sec-WebSocket-Key: "+ websocket_key_str+"\r\n"
"Sec-WebSocket-Protocol: urbackup\r\n"
"Sec-WebSocket-Version: 13\r\n\r\n");
@ -1278,6 +1283,11 @@ IPipe * InternetClient::connect(const SServerConnectionSettings & selected_serve
header += ch;
}
if (state == 4)
{
has_header = true;
}
}
} while (!has_header &&
Server->getTimeMS() - starttime < resp_timeout);
@ -1320,7 +1330,7 @@ IPipe * InternetClient::connect(const SServerConnectionSettings & selected_serve
return NULL;
}
if (header_map["connection"] != "upgrade")
if (strlower(header_map["connection"]) != "upgrade")
{
Server->Log("Unknown web socket connection value \"" + header_map["connection"] + "\"", LL_ERROR);
Server->destroy(cs);

View File

@ -44,6 +44,7 @@
<ClCompile Include="..\urbackupcommon\SparseFile.cpp" />
<ClCompile Include="..\urbackupcommon\TreeHash.cpp" />
<ClCompile Include="..\urbackupcommon\WalCheckpointThread.cpp" />
<ClCompile Include="..\urbackupcommon\WebSocketPipe.cpp" />
<ClCompile Include="ChangeJournalWatcher.cpp" />
<ClCompile Include="client.cpp" />
<ClCompile Include="clientdao.cpp" />
@ -104,6 +105,7 @@
<ClInclude Include="..\urbackupcommon\SparseFile.h" />
<ClInclude Include="..\urbackupcommon\TreeHash.h" />
<ClInclude Include="..\urbackupcommon\WalCheckpointThread.h" />
<ClInclude Include="..\urbackupcommon\WebSocketPipe.h" />
<ClInclude Include="ChangeJournalWatcher.h" />
<ClInclude Include="client.h" />
<ClInclude Include="clientdao.h" />

View File

@ -192,6 +192,9 @@
<ClCompile Include="..\urbackupcommon\CompressedPipeZstd.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="..\urbackupcommon\WebSocketPipe.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\urbackupcommon\sha2\sha2.h">
@ -350,8 +353,11 @@
<ClInclude Include="ClientHash.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="..\urbackupcommon\WebSocketPipe.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="..\urbackupcommon\CompressedPipeZStd.h">
<Filter>Quelldateien</Filter>
<Filter>Headerdateien</Filter>
</ClInclude>
</ItemGroup>
</Project>

View File

@ -1,23 +1,26 @@
#include "WebSocketPipe.h"
#include "WebSocketPipe.h"
#include "../Interface/Server.h"
#include <algorithm>
#include <assert.h>
#include "../stringtools.h"
inline WebSocketPipe::WebSocketPipe(IPipe* pipe, bool mask_writes, bool expect_read_mask, std::string pipe_add, bool destroy_pipe)
WebSocketPipe::WebSocketPipe(IPipe* pipe, const bool mask_writes, const bool expect_read_mask, std::string pipe_add, bool destroy_pipe)
: pipe(pipe), mask_writes(mask_writes), expect_read_mask(expect_read_mask), has_error(false),
pipe_add(pipe_add), read_state(EReadState_Header1), masking_key(0), destroy_pipe(destroy_pipe),
pipe_add(pipe_add), read_state(EReadState_Header1), destroy_pipe(destroy_pipe),
read_mutex(Server->createMutex()), write_mutex(Server->createMutex())
{
memset(masking_key, 0, sizeof(masking_key));
if (mask_writes)
{
char zero_mask[4] = {};
/*
* Just use a fixed non-random masking key. We are not a browser, so the security implications
* are a bit different
*/
while (masking_key == 0)
while (memcmp(zero_mask, masking_key, sizeof(masking_key)) == 0)
{
masking_key = Server->getRandomNumber();
Server->randomFill(masking_key, sizeof(masking_key));
}
}
}
@ -149,11 +152,11 @@ bool WebSocketPipe::Write(const char* buffer, size_t bsize, int timeoutms, bool
memcpy(new_buf.data(), header, header_pos);
char* mask_ptr = reinterpret_cast<char*>(&masking_key);
Server->Log("Masking key: " + convert(*((unsigned int*)masking_key)));
for (size_t i = 0; i < bsize; ++i)
{
size_t j = i % 4;
new_buf[header_pos + i] = buffer[i] ^ mask_ptr[j];
new_buf[header_pos + i] = buffer[i] ^ masking_key[j];
}
return pipe->Write(new_buf.data(), header_pos + bsize, timeoutms, flush);
@ -227,6 +230,18 @@ size_t WebSocketPipe::Read(std::string* ret, int timeoutms)
return 0;
}
bool WebSocketPipe::isReadable(int timeoutms)
{
{
IScopedLock lock(read_mutex.get());
if (!pipe_add.empty())
return true;
}
return pipe->isReadable(timeoutms);
}
size_t WebSocketPipe::consume(char* buffer, size_t bsize, int write_timeoutms, size_t* consumed_out)
{
size_t consumed = 0;
@ -284,10 +299,11 @@ size_t WebSocketPipe::consume(char* buffer, size_t bsize, int write_timeoutms, s
read_state = EReadState_HeaderMask;
remaining_size_bytes = 4;
consumed_size_bytes = 0;
curr_has_read_mask = true;
}
else
{
read_mask = 0;
curr_has_read_mask = false;
read_state = EReadState_Body;
}
}
@ -295,12 +311,14 @@ size_t WebSocketPipe::consume(char* buffer, size_t bsize, int write_timeoutms, s
{
remaining_size_bytes = 2;
consumed_size_bytes = 0;
payload_size = 0;
read_state = EReadState_HeaderSize2;
}
else if (tmp_payload_size == 127)
{
remaining_size_bytes = 8;
consumed_size_bytes = 0;
payload_size = 0;
read_state = EReadState_HeaderSize2;
}
else
@ -324,12 +342,13 @@ size_t WebSocketPipe::consume(char* buffer, size_t bsize, int write_timeoutms, s
if (has_read_mask())
{
read_state = EReadState_HeaderMask;
curr_has_read_mask = true;
remaining_size_bytes = 4;
consumed_size_bytes = 0;
}
else
{
read_mask = 0;
curr_has_read_mask = false;
read_state = EReadState_Body;
}
}
@ -340,7 +359,7 @@ size_t WebSocketPipe::consume(char* buffer, size_t bsize, int write_timeoutms, s
++consumed;
--remaining_size_bytes;
read_mask |= mask_byte << (consumed_size_bytes * 8);
read_mask[consumed_size_bytes] = mask_byte;
++consumed_size_bytes;
if (remaining_size_bytes == 0)
@ -363,11 +382,11 @@ size_t WebSocketPipe::consume(char* buffer, size_t bsize, int write_timeoutms, s
}
else if (out_off == consumed)
{
if (read_mask != 0)
if (curr_has_read_mask)
{
for (size_t i = 0; i < toread; ++i)
{
buffer[out_off] = buffer[out_off] ^ reinterpret_cast<char*>(&read_mask)[read_mask_idx % 4];
buffer[out_off] = buffer[out_off] ^ read_mask[read_mask_idx % 4];
++read_mask_idx;
++out_off;
}
@ -382,13 +401,14 @@ size_t WebSocketPipe::consume(char* buffer, size_t bsize, int write_timeoutms, s
else
{
assert(out_off < consumed);
if (read_mask != 0)
if (curr_has_read_mask)
{
for (size_t i = 0; i < toread; ++i)
{
buffer[out_off] = buffer[consumed] ^ reinterpret_cast<char*>(&read_mask)[read_mask_idx % 4];
buffer[out_off] = buffer[consumed] ^ read_mask[read_mask_idx % 4];
++read_mask_idx;
++out_off;
++consumed;
}
}
else
@ -420,7 +440,7 @@ size_t WebSocketPipe::consume(char* buffer, size_t bsize, int write_timeoutms, s
char msg[2];
unsigned char opcode = 10; //pong
msg[0] = opcode | (1 << 7);
msg[1] = (1<<7);
msg[1] = static_cast<char>(1<<7);
if (!pipe->Write(msg, 2, write_timeoutms, true))
{
has_error = true;

View File

@ -16,7 +16,7 @@ class WebSocketPipe : public IPipe
};
public:
WebSocketPipe(IPipe* pipe, bool mask_writes, bool expect_read_mask, std::string pipe_add, bool destroy_pipe);
WebSocketPipe(IPipe* pipe, const bool mask_writes, const bool expect_read_mask, std::string pipe_add, bool destroy_pipe);
~WebSocketPipe();
virtual size_t Read(char* buffer, size_t bsize, int timeoutms = -1);
@ -35,12 +35,9 @@ public:
}
virtual bool isWritable(int timeoutms = 0)
{
return pipe->isWritable();
}
virtual bool isReadable(int timeoutms = 0)
{
return pipe->isReadable();
return pipe->isWritable(timeoutms);
}
virtual bool isReadable(int timeoutms = 0);
virtual bool hasError(void)
{
return has_error || pipe->hasError();
@ -88,22 +85,22 @@ private:
size_t consume(char* buffer, size_t bsize, int write_timeoutms, size_t* consumed_out);
bool mask_writes;
bool expect_read_mask;
const bool mask_writes;
const bool expect_read_mask;
IPipe* pipe;
EReadState read_state;
std::vector<char> read_buffer;
unsigned char header_bits1;
unsigned char header_bits2;
uint64 payload_size;
size_t remaining_size_bytes;
size_t consumed_size_bytes;
unsigned int read_mask;
char read_mask[4];
bool curr_has_read_mask;
unsigned int read_mask_idx;
bool has_error;
std::string pipe_add;
unsigned int masking_key;
char masking_key[4];
bool destroy_pipe;
std::auto_ptr<IMutex> read_mutex;

View File

@ -49,9 +49,9 @@ void WebSocketConnector::Execute(str_map& GET, THREAD_ID tid, str_map& PARAMS, I
str_map::iterator it_forwarded_for = PARAMS.find("X-FORWARDED-FOR");
WebSocketPipe ws_pipe(pipe, false, true, std::string(), false);
WebSocketPipe* ws_pipe = new WebSocketPipe(pipe, false, true, std::string(), false);
client->Init(tid, &ws_pipe, it_forwarded_for != PARAMS.end() ? it_forwarded_for->second : endpoint_name);
client->Init(tid, ws_pipe, it_forwarded_for != PARAMS.end() ? it_forwarded_for->second : endpoint_name);
while (true)
{
@ -63,11 +63,11 @@ void WebSocketConnector::Execute(str_map& GET, THREAD_ID tid, str_map& PARAMS, I
if (client->wantReceive())
{
if (ws_pipe.isReadable(10))
if (ws_pipe->isReadable(10))
{
client->ReceivePackets(NULL);
}
else if (ws_pipe.hasError())
else if (ws_pipe->hasError())
{
client->ReceivePackets(NULL);
Server->wait(20);
@ -79,9 +79,15 @@ void WebSocketConnector::Execute(str_map& GET, THREAD_ID tid, str_map& PARAMS, I
}
}
bool want_destory_pipe = client->closeSocket();
wrapped_service->destroyClient(client);
if (want_destory_pipe)
{
delete ws_pipe;
delete pipe;
}
}
std::string WebSocketConnector::getName()

View File

@ -417,98 +417,6 @@ DLLEXPORT void LoadActions(IServer* pServer)
exit(0);
}
IPipe* pipe = Server->ConnectStream("192.168.239.142", 8080, 10000);
pipe->Write("GET / HTTP/1.1\r\n"
"Host: localhost\r\n"
"Upgrade: websocket\r\n"
"Connection: upgrade\r\n"
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"
"Origin: http://localhost\r\n"
"Sec-WebSocket-Protocol: urbackup\r\n"
"Sec-WebSocket-Version: 13\r\n\r\n");
char buf[512];
int64 resp_timeout = 30000;
int64 starttime = Server->getTimeMS();
int state = 0;
std::string header;
std::string pipe_add;
bool has_header = false;
do
{
_u32 read = pipe->Read(buf, sizeof(buf), 10000);
if (read > 0)
{
for (_u32 i = 0; i < read; ++i)
{
char ch = buf[i];
if (state == 0)
{
if (ch == '\r')
state = 1;
else if (ch == '\n')
state = 2;
else
state = 0;
}
else if (state == 1)
{
if (ch == '\n')
state = 2;
else
state = 0;
}
else if (state == 2)
{
if (ch == '\r')
state = 3;
else if (ch == '\n')
state = 4;
else
state = 0;
}
else if (state == 3)
{
if (ch == '\n')
state = 4;
else
state = 0;
}
else if (state == 4)
{
pipe_add.assign(buf + i, read - i);
has_header = true;
break;
}
header += ch;
}
}
} while (!has_header &&
Server->getTimeMS() - starttime < resp_timeout);
WebSocketPipe ws_pipe(pipe, true, false, pipe_add);
while (true)
{
std::string ret;
ws_pipe.Read(&ret);
Server->Log("Read " + ret);
if (ret == "something")
{
ws_pipe.Write("other");
}
}
exit(0);
std::string download_file=Server->getServerParameter("download_file");
if(!download_file.empty())
{

View File

@ -4326,7 +4326,7 @@ function getInternetSettings()
if(!I('internet_server_port')) return "";
if(I('internet_server_port').value.indexOf(";")==-1
&& !validate_text_int(["internet_server_port"]) ) return null;
if(!validate_text_regex([{ id: "internet_server", regexp: /(((;|^)(([\w-]+(\.[\w-]*)*)|((?!0)(?!.*\.)((1?\d?\d|25[0-5]|2[0-4]\d)(\.)){4})))+$)|(^$)/i }])) return null;
if(!validate_text_regex([{ id: "internet_server", regexp: /(((;|^)(([\w-]+(\.[\w-]*)*)|((?!0)(?!.*\.)((1?\d?\d|25[0-5]|2[0-4]\d)(\.)){4})))+$)|(^$)|(^(ws|wss):\/\/[\w-]+([\w-]*)+([\w.,@?^=%&amp;:\/~+#-]*[\w@?^=%&amp;\/~+#-])?$)/i }])) return null;
if(!validate_text_regex([{ id: "internet_server_proxy", regexp: /(^(http|https):\/\/[\w-]+([\w-]*)+([\w.,@?^=%&amp;:\/~+#-]*[\w@?^=%&amp;\/~+#-])?$)|(^$)/i }])) return null;
var pars="";
for(var i=0;i<g.internet_settings_list.length;++i)