Redis服务端头文件,定义服务端相关类和方法。
#ifndef REDIS_SERVER.H
#define REDIS_SERVER.H
#include<iostream>
#include<fstream>
#include<vector>
#include<thread>
#include<mutex>
#include<atomic>
#include<cassert>
#include<future>
#include<algorithm>
#include<functional>
#include<stdexcept>
#include<unistd.h>
#include<iomanip>
#include<chrono>
#include<ctime>
#include<signal.h>
#include<fcntl.h>
#include<cstring>
#include<queue>
#include<string>
#include "ParserFlyweightFactory.h"
using namespace std;
class RedisServer
{
private:
std::unique_ptr<ParserFlyweightFactory> flyweightFactory;
int port;
std::atomic<bool> stop{false};
pid_t pid;
std::string logFilePath;
bool startMulti = false;
bool fallback = false;
std::queue<std::string>commandsQueue;
private:
RedisServer(int port = 5555, const std::string& logFilePath = MY_PROJECT_DIR_LOGO);
static void signalHandler(int sig);
void printLog();
void printStartMessage();
void replaceText(std::string &text,const std::string &toReplaceText, const std::string &ReplaceText);
std::string getDate();
string executeTransaction(std::queue<std::string>&commandsQueue);
public:
string handleClient(string receiveData);
static RedisServer* getInstance();
void start();
};
#endif
Redis服务端主逻辑实现文件,包括客户端连接管理和请求处理。
#include "RedisServer.h"
RedisServer* RedisServer::getInstance(){
static RedisServer redis;
return &redis;
}
void RedisServer::signalHandler(int sig) {
if (sig == SIGINT) {
CommandParser::getRedisHelper()->flush();
exit(0);
}
}
void RedisServer::printLog(){
std::ifstream ifs(logFilePath);
if(!ifs.is_open()){
std::cout << "logoFilePath no exist" << std::endl;
}
std::string line = "";
while(std::getline(ifs,line)){
replaceText(line, "PORT", std::to_string(port));
replaceText(line, "OTHREAD_ID", std::to_string(pid));
std::cout << line << std::endl;
}
}
void RedisServer:: printStartMessage(){
std::string startMessage = "[PID] DATE # Server started.";
std::string initMessage = "[PID] DATE * The server is now ready to accept connections on port PORT";
replaceText(startMessage, "PID", std::to_string(pid));
replaceText(startMessage, "DATE", getDate());
replaceText(initMessage, "PORT", std::to_string(port));
replaceText(initMessage, "PID", std::to_string(pid));
replaceText(initMessage, "DATE", getDate());
std::cout << startMessage << std::endl;
std::cout << initMessage << std::endl;
}
void RedisServer::start() {
signal(SIGINT, signalHandler);
printLog();
printStartMessage();
}
void RedisServer::replaceText(std::string &text, const std::string &toReplaceText, const std::string &newText) {
size_t start_pos = text.find(toReplaceText);
while (start_pos != std::string::npos) {
text.replace(start_pos, toReplaceText.length(), newText);
start_pos = text.find(toReplaceText, start_pos + newText.length());
}
}
std::string RedisServer::getDate() {
auto now = std::chrono::system_clock::now();
auto now_c = std::chrono::system_clock::to_time_t(now);
std::tm local_tm;
localtime_r(&now_c, &local_tm);
std::ostringstream oss;
oss << std::put_time(&local_tm, "%Y-%m-%d %H:%M:%S");
return oss.str();
}
string RedisServer:: executeTransaction(std::queue<std::string>&commandsQueue){
std::vector<std::string>responseMessagesList;
while(!commandsQueue.empty()){
std::string receivedData = std::move(commandsQueue.front());
commandsQueue.pop();
std::istringstream iss(receivedData);
std::string command;
std::vector<std::string> tokens;
std::string responseMessage;
while (iss >> command) {
tokens.push_back(command);
}
command = tokens.front();
if (!tokens.empty()) {
command = tokens.front();
std::string responseMessage;
if(command=="quit"||command=="exit"){
responseMessage="stop";
return responseMessage;
}else if(command=="multi"){
responseMessage="Open the transaction repeatedly!";
responseMessagesList.emplace_back(responseMessage);
continue;
}else if(command == "exec"){
responseMessage="No transaction is opened!";
responseMessagesList.emplace_back(responseMessage);
continue;
}else{
std::shared_ptr<CommandParser> commandParser = flyweightFactory->getParser(command);
try {
responseMessage = commandParser->parse(tokens);
} catch (const std::exception& e) {
responseMessage = "Error processing command '" + command + "': " + e.what();
}
responseMessagesList.emplace_back(responseMessage);
}
}
}
string res = "";
for(int i=0;i<responseMessagesList.size();i++){
std::string responseMessage = std::to_string(i+1)+")"+responseMessagesList[i];
res += responseMessage;
if(i!=responseMessagesList.size()-1){
res+="\n";
}
}
return res;
}
string RedisServer::handleClient(string receivedData) {
size_t bytesRead = receivedData.length();
if (bytesRead > 0) {
std::istringstream iss(receivedData);
std::string command;
std::vector<std::string> tokens;
while (iss >> command) {
tokens.push_back(command);
}
if (!tokens.empty()) {
command = tokens.front();
std::string responseMessage;
if (command == "quit" || command == "exit") {
responseMessage = "stop";
return responseMessage;
}
else if (command == "multi") {
if (startMulti) {
responseMessage = "Open the transaction repeatedly!";
return responseMessage;
}
startMulti = true;
std::queue<std::string> empty;
std::swap(empty, commandsQueue);
responseMessage = "OK";
return responseMessage;
}
else if (command == "exec") {
if (startMulti == false) {
responseMessage = "No transaction is opened!";
return responseMessage;
}
startMulti = false;
if (!fallback) {
responseMessage = executeTransaction(commandsQueue);
return responseMessage;
}
else {
fallback = false;
responseMessage = "(error) EXECABORT Transaction discarded because of previous errors.";
return responseMessage;
}
}
else if (command == "discard") {
startMulti = false;
fallback = false;
responseMessage = "OK";
return responseMessage;
}
else {
if (!startMulti) {
std::shared_ptr<CommandParser> commandParser = flyweightFactory->getParser(command);
if (commandParser == nullptr) {
responseMessage = "Error: Command '" + command + "' not recognized.";
}
else {
try {
responseMessage = commandParser->parse(tokens);
}
catch (const std::exception& e) {
responseMessage = "Error processing command '" + command + "': " + e.what();
}
}
return responseMessage;
}
else {
std::shared_ptr<CommandParser> commandParser = flyweightFactory->getParser(command);
if (commandParser == nullptr) {
fallback = true;
responseMessage = "Error: Command '" + command + "' not recognized.";
return responseMessage;
}
else {
commandsQueue.emplace(receivedData);
responseMessage = "QUEUED";
return responseMessage;
}
}
}
}
}
else {
return "nil";
}
return "error";
}
RedisServer::RedisServer(int port, const std::string& logoFilePath)
: port(port), logFilePath(logoFilePath),
flyweightFactory(new ParserFlyweightFactory()){
pid = getpid();
}