//-----------------------------------------------------------------------------
//(c) 2007-2024 pavel.pimenov@gmail.com
//-----------------------------------------------------------------------------
#include <stdio.h>
#include "CDBManager.h"
#ifdef _WIN32
#define snprintf _snprintf
#else
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#define SOCKET int
#define INVALID_SOCKET (-1)
#define SOCKET_ERROR (-1)
#define closesocket(x) close(x)
#endif
using std::string;
//==========================================================================
bool g_setup_log_disable_test_port = true;
bool g_setup_syslog_disable = true;
//==========================================================================
CFlyLogThreadInfoArray* CFlyServerContext::g_log_array = NULL;
//==========================================================================
sqlite_int64 get_tick_count()
{
#ifdef _WIN32 // Only in windows
LARGE_INTEGER l_counter;
QueryPerformanceCounter(&l_counter);
return l_counter.QuadPart;
//return GetTickCount64();
#else // Linux
struct timeval tim;
gettimeofday(&tim, NULL);
unsigned int t = ((tim.tv_sec * 1000) + (tim.tv_usec / 1000)) & 0xffffffff;
return t;
#endif // _WIN32
}
//==========================================================================
// ������ - 172.23.17.18:30002172.23.17.18: CID = $FLY-TEST-PORT MSL2NL7QB24PKECJEJFPGWY7S3TGTPMXPWDTWPA172.23.17.18:30002
static void set_socket_opt(SOCKET p_sock, int p_option, int p_val)
{
int len = sizeof(p_val); // x64 - x86 int ������ ������
if(setsockopt(p_sock, SOL_SOCKET, p_option, (char*)&p_val, len) < 0)
{
std::cout << "set_socket_opt option = "<< p_option << " val = " << p_val << " failed!\n";
}
}
//==========================================================================
static void send_udp_tcp_test_port(const std::string& p_PID, const std::string& p_CID, const std::string& p_ip, const string& p_port, bool p_is_tcp)
{
#ifdef FLYLINKDC_USE_TEST_PORT_PROMETHEUS
g_DB.flyserver_test_port_counter(p_is_tcp ? "tcp": "udp");
#endif
#ifdef _DEBUG
#ifdef _WIN32
if (p_is_tcp)
{
std::cout << "TCP test_port - ip = " << p_ip << ":" << p_port << " CID = " << p_CID << " PID = " << p_PID << std::endl;
}
else
{
std::cout << "UDP test_port - ip = " << p_ip << ":" << p_port << " CID = " << p_CID << " PID = " << p_PID << std::endl;
}
#endif
#endif
const unsigned short l_port = atoi(p_port.c_str());
const string l_header = "$FLY-TEST-PORT " + p_CID + p_ip + ':' + p_port + "|";
struct sockaddr_in addr = {0};
int l_result = 0;
SOCKET sock = socket(AF_INET, p_is_tcp ? SOCK_STREAM : SOCK_DGRAM, 0);
if (sock == INVALID_SOCKET)
{
std::cout << "send_udp_tcp_test_port - socket error! error code = " << errno << " CID = " << p_CID << std::endl;
return;
}
{
struct timeval timeout;
timeout.tv_sec = 5;
timeout.tv_usec = 0;
if (setsockopt (sock, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout,
sizeof(timeout)) < 0)
std::cout << "setsockopt SO_RCVTIMEO failed\n";
if (setsockopt (sock, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout,
sizeof(timeout)) < 0)
std::cout << "setsockopt SO_SNDTIMEO failed\n";
}
addr.sin_family = AF_INET;
addr.sin_port = htons(l_port);
addr.sin_addr.s_addr = inet_addr(p_ip.c_str());
if (p_is_tcp)
{
int sz = sizeof(addr);
l_result = connect(sock, (struct sockaddr*) &addr, sz);
}
if (l_result == SOCKET_ERROR)
{
//std::cout << "connect error! error code = " << errno;
l_result = closesocket(sock);
if (l_result == SOCKET_ERROR)
{
std::cout << "send_udp_tcp_test_port - closesocket error! error code = " << errno << " CID = " << p_CID << std::endl;
}
return;
}
// TODO - add timeout for TCP
// http://stackoverflow.com/questions/4181784/how-to-set-socket-timeout-in-c-when-making-multiple-connections
l_result = sendto(sock, l_header.c_str(), l_header.size(), 0, (struct sockaddr*)&addr, sizeof(addr));
if (l_result == SOCKET_ERROR)
{
std::cout << "send_udp_tcp_test_port - sendto error! error code = " << errno << " CID = " << p_CID << std::endl;
}
l_result = closesocket(sock);
if (l_result == SOCKET_ERROR)
{
std::cout << "send_udp_tcp_test_port - closesocket error! error code = " << errno << " CID = " << p_CID << std::endl;
}
}
//==========================================================================
static void* thread_proc_udp_tcp_test_port(void* p_param)
{
static volatile LONG g_count_thread = 0;
static volatile unsigned g_count_all = 0;
CFlySafeGuard l_call_deep(g_count_thread); // TODO - ����� ������� ����� ������� ����� ��
string l_str_deep_call(LONG(l_call_deep), '*');
std::unique_ptr<CFlyPortTestThreadInfo> l_info(reinterpret_cast<CFlyPortTestThreadInfo*>(p_param));
for (int i = 0; i < l_info->m_ports.size(); ++i)
{
send_udp_tcp_test_port(l_info->m_PID, l_info->m_CID, l_info->m_ip, l_info->m_ports[i].first, l_info->m_ports[i].second);
//std::cout << "[" << l_str_deep_call << "]" <<
// "[" << ++g_count_all << "] " <<
// l_info->get_type_port(i) << " port-thread-test ip = " << l_info->m_ip << ":" << l_info->m_ports[i].first <<
// " CID = " << l_info->m_CID << " PID = " << l_info->m_PID << std::endl;
#ifndef _WIN32
if (g_setup_syslog_disable == false)
{
syslog(LOG_NOTICE, "[%s][%u] %s-port-thread-test %s:%s CID = %s PID = %s",
l_str_deep_call.c_str(),
++g_count_all,
l_info->get_type_port(i),
l_info->m_ip.c_str(),
l_info->m_ports[i].first.c_str(),
l_info->m_CID.c_str(),
l_info->m_PID.c_str());
}
#endif
}
return NULL;
}
//========================================================================================================
static string process_test_port(const CFlyServerContext& p_flyserver_cntx)
{
string l_result;
Json::Value l_root;
Json::Reader reader(Json::Features::strictMode());
const bool parsingSuccessful = reader.parse(p_flyserver_cntx.m_in_query, l_root);
if (!parsingSuccessful)
{
const char* l_error = "[FLY_POST_QUERY_TEST_PORT] Failed to parse json configuration";
std::cout << l_error << std::endl;
#ifndef _WIN32
syslog(LOG_ERR, "[FLY_POST_QUERY_TEST_PORT] Failed to parse json configuration - %s", l_error);
#endif
}
else
{
const Json::Value& l_udp = l_root["udp"];
const Json::Value& l_tcp = l_root["tcp"];
CFlyPortTestThreadInfo* l_info = NULL;
if (l_tcp.size() || l_udp.size())
{
const string l_CID = l_root["CID"].asString();
const string l_PID = l_root["PID"].asString();
l_info = new CFlyPortTestThreadInfo;
l_info->m_ip = p_flyserver_cntx.m_remote_ip;
l_info->m_CID = l_CID;
l_info->m_PID = l_PID;
l_info->m_ports.reserve(l_tcp.size() + l_udp.size());
}
if (l_info)
{
for (int j = 0; j < l_udp.size(); ++j)
{
const std::string l_port = l_udp[j]["port"].asString();
l_info->m_ports.push_back(std::make_pair(l_port, false));
}
for (int k = 0; k < l_tcp.size(); ++k)
{
const std::string l_port = l_tcp[k]["port"].asString();
l_info->m_ports.push_back(std::make_pair(l_port, true));
}
// �������� ����� ��� ����� TCP
if (mg_start_thread(thread_proc_udp_tcp_test_port, l_info))
{
delete l_info;
}
}
Json::Value l_test_port_result;
l_test_port_result["ip"] = p_flyserver_cntx.m_remote_ip;
l_result = l_test_port_result.toStyledString();
}
return l_result;
}
//==========================================================================
static void* thread_proc_store_log(void* p_param)
{
CFlyLogThreadInfoArray* l_p_array = (CFlyLogThreadInfoArray*)p_param;
for (CFlyLogThreadInfoArray::iterator i = l_p_array->begin(); i != l_p_array->end(); ++i)
{
const char* l_log_dir_name = NULL;
switch (i->m_query_type)
{
#ifdef FLY_SERVER_USE_FULL_LOCAL_LOG
case FLY_POST_QUERY_LOGIN:
if (!g_setup_log_disable_login)
l_log_dir_name = "log-login";
break;
case FLY_POST_QUERY_GET:
l_log_dir_name = "log-get";
break;
#endif
case FLY_POST_QUERY_TEST_PORT:
//if (!g_setup_log_disable_test_port)
// l_log_dir_name = "log-test-port";
break;
}
if (l_log_dir_name)
{
const string l_file_name = CFlyServerContext::get_json_file_name(l_log_dir_name, i->m_remote_ip.c_str(), i->m_now);
std::fstream l_log_json(l_file_name.c_str(), std::ios_base::out | std::ios_base::trunc);
if (!l_log_json.is_open())
{
std::cout << "Error open file: " << l_file_name << " errno = " << errno << "\r\n";
#ifndef _WIN32
syslog(LOG_ERR, "Error open file: = %s errno = %d", l_file_name.c_str(), errno);
#endif
}
else
{
if (i->m_in_query.length())
{
#ifdef FLYLINKDC_USE_TEST_PORT_PROMETHEUS
g_DB.flyserver_log_counter(l_log_dir_name, i->m_in_query.length());
#endif
l_log_json.write(i->m_in_query.c_str(), i->m_in_query.length());
if (l_log_json.fail() || !l_log_json.good())
{
std::cout << "Error: failed to write to l_log_json!" << l_file_name << " errno = " << errno <<"\r\n";
#ifndef _WIN32
syslog(LOG_ERR, "Error: failed to write to l_log_json! file = %s errno = %d", l_file_name.c_str(), errno);
#endif
}
}
else
{
std::cout << "Error: len(=0) for log file: " << l_file_name << " errno = " << errno << "\r\n";
#ifndef _WIN32
syslog(LOG_ERR, "Error: len(=0) for log file = %s errno = %d", l_file_name.c_str(), errno);
#endif
}
#ifdef _DEBUG
if (i->m_query_type != FLY_POST_QUERY_TEST_PORT)
{
// TODO - ��������� ��� ������������
//if (!l_p->l_flyserver_cntx.m_res_stat.empty())
//{
// l_log_json << std::endl << "OUT:" << std::endl << l_flyserver_cntx.m_res_stat;
//}
}
#endif
}
}
}
std::cout << std::endl << "Flush log files count: " << l_p_array->size() << "\r\n";
#ifndef _WIN32
syslog(LOG_NOTICE, "Flush log files count: = %d", int(l_p_array->size()));
#endif
delete l_p_array;
return NULL;
}
//==========================================================================
void CFlyServerContext::run_db_query(const char* p_content, size_t p_len, CDBManager& p_DB)
{
zlib_uncompress((uint8_t*)p_content, p_len, m_decompress);
m_tick_count_start_db = get_tick_count();
init_in_query(p_content, p_len);
#ifdef MT_DEBUG
if (m_query_type == FLY_POST_QUERY_GET && m_decompress.size() != p_len)
{
std::ofstream l_fs;
static int g_id_file;
l_fs.open(std::string("flylinkdc-extjson-zlib-file-" + toString(++g_id_file) + ".json.zlib").c_str(), std::ifstream::out);
#ifdef __linux__
// l_fs.open("flylinkdc-extjson-zlib-file-" + toString(++g_id_file) + ".json.zlib", std::ifstream::out);
#else
// l_fs.open("flylinkdc-extjson-zlib-file-" + toString(++g_id_file) + ".json.zlib", std::ifstream::out | std::ifstream::binary);
#endif
l_fs.write(p_content, p_len);
}
#endif // MT_DEBUG
if (m_query_type == FLY_POST_QUERY_TEST_PORT)
{
m_res_stat = process_test_port(*this);
}
m_tick_count_stop_db = get_tick_count();
run_thread_log();
comress_result();
}
//==========================================================================
void CFlyServerContext::send_syslog() const
{
extern unsigned long long g_sum_out_size;
extern unsigned long long g_sum_in_size;
extern unsigned long long g_z_sum_out_size;
extern unsigned long long g_z_sum_in_size;
extern unsigned long long g_count_query;
if (g_setup_syslog_disable == false)
{
char l_log_buf[512];
l_log_buf[0] = 0;// ���� ����������� 152-160
char l_buf_cache[32];
l_buf_cache[0] = 0;
char l_buf_counter[64];
l_buf_counter[0] = 0;
if (m_count_cache)
{
snprintf(l_buf_cache, sizeof(l_buf_cache), "[cache=%u]", (unsigned) m_count_cache);
}
if (m_count_get_only_counter == 0 && m_count_get_base_media_counter == 1 && m_count_get_ext_media_counter == 1)
{
snprintf(l_buf_counter, sizeof(l_buf_counter), "%s","[get full Inform!]");
}
else if (m_count_get_base_media_counter != 0 || m_count_get_ext_media_counter != 0 || m_count_insert != 0)
{
snprintf(l_buf_counter, sizeof(l_buf_counter), "[cnt=%u,base=%u,ext=%u,new=%u]",
(unsigned)m_count_get_only_counter,
(unsigned)m_count_get_base_media_counter,
(unsigned)m_count_get_ext_media_counter,
(unsigned)m_count_insert);
}
if (m_query_type == FLY_POST_QUERY_TEST_PORT)
{
snprintf(l_log_buf, sizeof(l_log_buf), "[%s][%c][%u][%s][%s][in:%u/%u][c:%u][%s]",
m_fly_response.c_str(),
get_compress_flag(),
(unsigned)m_count_file_in_json,
m_uri.c_str(),
m_remote_ip.c_str(),
(unsigned)get_real_query_size(),
(unsigned)m_content_len,
(unsigned)g_count_query,
m_user_agent.c_str()
);
}
else
{
snprintf(l_log_buf, sizeof(l_log_buf), "[%s][%c][%u]%s[%s][%s][%u/%u->%u/%u][time db=%u][%s]%s%s",
m_fly_response.c_str(),
get_compress_flag(),
(unsigned)m_count_file_in_json,
"",
m_uri.c_str(),
m_remote_ip.c_str(),
(unsigned)get_real_query_size(),
(unsigned)m_content_len,
(unsigned)m_res_stat.size(),
(unsigned)get_http_len(),
(unsigned)get_delta_db(),
m_user_agent.c_str(),
l_buf_cache,
l_buf_counter
);
}
std::cout << ".";
static int g_cnt = 0;
if ((++g_cnt % 30) == 0)
std::cout << std::endl;
#ifndef _WIN32 // Only in linux
syslog(LOG_NOTICE, "%s", l_log_buf);
#endif
std::cout << l_log_buf << std::endl;
}
}
//==========================================================================
void CFlyServerContext::flush_log_array(bool p_is_force)
{
if (g_log_array)
{
#ifndef _DEBUG
if (g_log_array->size() > 50 || p_is_force)
#else
if (g_log_array->size() > 0 || p_is_force)
#endif
{
CFlyLogThreadInfoArray* l_log_array = g_log_array;
g_log_array = NULL;
if (mg_start_thread(thread_proc_store_log, l_log_array))
{
thread_proc_store_log(l_log_array); // ��������� ��� ������ � �������� l_thread_param
}
}
}
}
//==========================================================================
void CFlyServerContext::run_thread_log()
{
if (is_valid_query())
{
if (!m_in_query.empty() &&
(m_query_type == FLY_POST_QUERY_TEST_PORT && g_setup_log_disable_test_port == false)
) // ���� ������� ������ � ��� ������� ����� ��������� �� ����
{
if (!g_log_array)
{
g_log_array = new CFlyLogThreadInfoArray;
}
g_log_array->push_back(CFlyLogThreadInfo(m_query_type, m_remote_ip, m_in_query));
flush_log_array(false);
}
}
else
{
const char* l_log_text = "l_query_type == FLY_POST_QUERY_TEST_PORT";
#ifdef _DEBUG
std::cout << l_log_text << std::endl;
#ifndef _WIN32
syslog(LOG_NOTICE, "%s", l_log_text);
#endif
#endif // _DEBUG
}
}
//========================================================================================================
bool zlib_compress(const char* p_source, size_t p_len, std::vector<unsigned char>& p_compress, int& p_zlib_result, int p_level /*= 9*/)
{
auto l_dest_length = compressBound(p_len) + 2;
p_compress.resize(l_dest_length);
p_zlib_result = compress2(p_compress.data(), &l_dest_length, (uint8_t*)p_source, p_len, p_level);
if (p_zlib_result == Z_OK) // TODO - Check memory
{
#ifdef _DEBUG
if (l_dest_length)
{
std::cout << std::endl << "Compress zlib size " << p_len << "/" << l_dest_length << std::endl;
}
#endif
p_compress.resize(l_dest_length);
}
else
{
p_compress.clear();
}
return !p_compress.empty();
}
//========================================================================================================
bool zlib_uncompress(const uint8_t* p_zlib_source, size_t p_zlib_len, std::vector<unsigned char>& p_decompress)
{
auto l_decompress_size = p_zlib_len * 3;
if (p_zlib_len >= 2 && p_zlib_source[0] == 0x78) // zlib �������?
// && (unsigned char)p_zlib_source[1] == 0x9C ���� ��� ����� ���� ������
{
// ������� ���������� - ��� ������� ����� (������� �������)
// 1 - 0x01
// [2-5] - 0x5e
// 6 - 0x9c
// [7-9] - 0xda
// #ifdef _DEBUG
// char l_dump_zlib_debug[10] = {0};
// sprintf(l_dump_zlib_debug, "%#x", (unsigned char)l_post_data[1] & 0xFF);
// std::cout << "DEBUD zlib decompress header l_post_data[1] = " << l_dump_zlib_debug << std::endl;
// #endif
p_decompress.resize(l_decompress_size);
while (1)
{
const int l_un_compress_result = uncompress(p_decompress.data(), &l_decompress_size, p_zlib_source, p_zlib_len);
if (l_un_compress_result == Z_BUF_ERROR)
{
l_decompress_size *= 2;
p_decompress.resize(l_decompress_size);
continue;
}
if (l_un_compress_result == Z_OK)
{
p_decompress.resize(l_decompress_size);
}
else
{
p_decompress.clear(); // ���� ������ - �������� ������. ������ ������ �������� �������.
// TODO ����������� � ��������� ������� ������.
std::cout << "Error zlib_uncompress: code = " << l_un_compress_result << std::endl;
#ifndef _WIN32
syslog(LOG_ERR, "Error zlib_uncompress: code = %d", l_un_compress_result);
#endif
}
break;
};
}
return !p_decompress.empty();
}
//========================================================================================================
void CDBManager::init()
{
#ifndef _WIN32
openlog("fly-server-test-port", 0, LOG_USER); // LOG_PID
syslog(LOG_NOTICE, "CDBManager init");
#endif
}
//========================================================================================================
void CDBManager::shutdown()
{
}
//========================================================================================================
CDBManager::~CDBManager()
{
std::cout << std::endl << "* fly-server-test-port CDBManager::~CDBManager" << std::endl;
#ifndef _WIN32
syslog(LOG_NOTICE, "CDBManager destroy!");
closelog();
#endif
std::cout << std::endl << "* fly-server-test-port CDBManager destroy!" << std::endl;
}
↑ V547 Expression 'l_log_dir_name' is always false.