Windows-Sockets - Annehmen und Behandeln in getrennten Threads

Pain-maker

Mitglied
Hi @ all

Hab da mal wieder son Problem und glaube dass ich da nen Denkfehler hab... Nur leider komm ich nich drauf. Hab jetzt schon ewig rumprobiert, aber es klappt einfach nicht.
Ich poste mal zu Beginn einen Auszug des Quellcodes:
(Hab mal meine Versuche, bzw. die funktionierende Form auskommentiert mit gepostet)

PHP:
/**
 * startServerAsync
 */
bool MySocketServer::startServerAsync(void (*fkt)(MySocket *fkt, void *param), void *param)
{
	sockaddr_in  sa;
	UINT_PTR     ret;
	unsigned int i = 0;

	this->callback      = fkt;
	this->callbackParam = param;

	// IP konfigurieren
	memset(&sa, 0, sizeof(sa));
	sa.sin_family      = AF_INET;
	sa.sin_port        = htons(this->port);
	sa.sin_addr.s_addr = INADDR_ANY;
	
	// Neue Socket erstellen
	this->hndl = socket(AF_INET, SOCK_STREAM, 0);
	if(this->isValid() == false)
		throw "INVALID_SOCKET";

	// Socket an IP binden
	if(bind(this->hndl, (SOCKADDR*)&sa, sizeof(SOCKADDR_IN)) == SOCKET_ERROR)
	{
		closesocket(this->hndl);
		throw "INVALID_SOCKET";
	}

	// Auf eingehende Verbindungen warten
	if(listen(this->hndl, this->maxConnections) == SOCKET_ERROR)
	{
		printf("Fehler: listen, fehler code: %d\n", WSAGetLastError());
		return true;
	}
	
	// Clients initialisieren
	try
	{
		this->clients = new MySocket[this->maxConnections];
        this->clientWorker = new MySocketServerWorker[this->maxConnections];
		if(this->clients == NULL)
			throw 1;
		
		// Socket-Handles ungültig machen
		for(i=0; i<this->maxConnections; i++) 
			this->clients[i].setInvalid();
	}
	catch(...)
	{
		// Kein Speicher mehr verfügbar / Fehler bei der Speicherreservierung
		return false;
	}

	// Serverthread starten
	this->threadStatus = THREAD_RUNNING;
	ret = _beginthreadex(NULL, 0, &MySocketServer::serverThread, (void*)this, 0, NULL);
    if(ret != NULL)
	    ret = _beginthreadex(NULL, 0, &MySocketServer::acceptThread, (void*)this, 0, NULL);
	
    if(ret == NULL)
    {
        this->threadState = THREAD_STOPPED;
        return false;
    }
    return true;
}

/**
 * serverThread
 */
int MySocketServer::serverThread(void *ptr)
{
	MySocketServerWorker  *worker      = NULL;
	MySocketServer        *server      = (MySocketServer*)ptr; // this-Pointer
	SOCKET                clientSocket = INVALID_SOCKET;
	fd_set                *fdSet       = &server->fds;
	unsigned int          i            = 0;
	int                   ret          = 0;
    struct sockaddr_in    addr_in;
	socklen_t             addr_len     = sizeof(addr_in);
	
	while(server->threadStatus == THREAD_RUNNING)
	{	
        // Server-Loop
	    FD_ZERO(fdSet);              // Inhalt leeren
    	FD_SET(server->hndl, fdSet); // Den Socket der Verbindungen annimmt hinzufügen

		/*
		// Neue Verbindungen registrieren
		for(i=0; i<server->maxConnections; i++)
		{
			if(server->clients[i].isValid())
				FD_SET(server->clients[i].getHandle(), fdSet);
		}
		*/
		
		// Auf Änderungen prüfen
		ret = select(0, fdSet, NULL, NULL, NULL);
		if(ret == SOCKET_ERROR) 
		{
			//printf("Fehler: select(), error code: %d\n", WSAGetLastError());
			//return 0;
            continue;
		}

		// Server-Socket überprüfen -> Verbindung annehmen (sofern noch freie Verbindungen verfügbar sind)
		if(FD_ISSET(server->hndl, fdSet))
		{
			for(i=0; i<server->maxConnections; i++)
			{
				// Auf freien Worker prüfen und diesen sperren
				if(server->clientWorker[i].lock())
				{
					// Neuen Clienten annehmen
					clientSocket = accept(server->hndl, (struct sockaddr*)&addr_in, &addr_len);
					if(clientSocket != INVALID_SOCKET)
					{
						server->clientWorker[i].setClient(&server->clients[i]);
         		       	server->clientWorker[i].setParam(server->callbackParam);
                		server->clientWorker[i].setCallback(server->callback);
                        server->clients[i].setHandle(clientSocket);
					}
					else
					{
						server->clientWorker[i].unlock();
						printf("ERROR! Failed to accept client\n");
					}
					break;
				}
			}
			// Keine freien Verbindungen
			if(i == server->maxConnections)
			{
				printf("WARNING! Too many connections!\n");
			}
		}
		
		/*
        // Neue Anfragen beabreiten
		for(i=0; i<server->maxConnections; i++)
		{
			if(!server->clients[i].isValid())
				continue;

			// Anfrage verarbeiten
			if(FD_ISSET(server->clients[i].getHandle(), fdSet))
			{
				server->clientWorker[i].run();
			}
		}
		*/
	}

	// Serverthread beendet
	server->threadStatus = THREAD_STOPPED;
	return 0;
}

/**
 * acceptThread
 */
int MySocketServer::acceptThread(void *ptr)
{
	MySocketServerWorker  *worker      = NULL;
	MySocketServer        *server      = (MySocketServer*)ptr; // this-Pointer
	SOCKET                clientSocket = INVALID_SOCKET;
	unsigned int          i            = 0;
	int                   ret          = 0;
	fd_set                myFds;
	fd_set                *fdSet       = &myFds;
	//fd_set                *fdSet       = &server->fds;

	// Server-Loop
	while(server->threadStatus == THREAD_RUNNING)
	{
        FD_ZERO(fdSet); 
		
		// Neue Verbindungen registrieren
		for(i=0; i<server->maxConnections; i++)
		{
			if(server->clients[i].isValid())
				FD_SET(server->clients[i].getHandle(), fdSet);
		}

		// Auf Änderungen prüfen
		ret = select(0, fdSet, NULL, NULL, NULL);
		if(ret == SOCKET_ERROR) 
		{
			//printf("Fehler: select(), error code: %d\n", WSAGetLastError());
			//return 0;
            continue;
		}
        
		// Neue Anfragen beabreiten
		for(i=0; i<server->maxConnections; i++)
		{
			if(!server->clients[i].isValid())
				continue;

            // Anfrage verarbeiten
			if(FD_ISSET(server->clients[i].getHandle(), fdSet))
			{
				server->clientWorker[i].run();
			}
		}
	}

	// Serverthread beendet
	server->threadStatus = THREAD_STOPPED;
	return 0;
}

Ich starte einen Server, der dann in einem gesonderten Thread via select() neue Clienten annimmt und diese verarbeitet. Nun brauchte ich dabei immer 2 weitere Schleifen in der Hauptschleife, was die Annahme von neuen Clienten via accept() verlangsamte. Daher dachte ich ich könnte das einfach in einen gesonderten Thread auslagern, aber klappt leider nicht richtig, Bei langsam hintereinander gestellten Anfragen klappt alles, aber wenn ich das Tempo erhöhe oder mehrere Anfragen gleichzeitig stelle, dann bricht die Verbindung einiger Clienten beim Lesen ab. (Ich teste mit ApacheBenchmark)

Das Verarbeiten läuft hier ab:
(Ursprünglich sollte das mal in einem Threadpool ausgeführt werden)

PHP:
/**
 * run
 */
void MySocketServerWorker::run()
{		
    if(this->running == false)
    {
        this->running = true;

        // Anfrage behandeln
	    char dummy[512];
	    this->client->receiveBytes(dummy, sizeof(dummy));
  	    this->client->sendBytes("HTTP/1.0 200 OK\n\n");
	    this->client->close();
	    /*
        this->callback(this->client, this->param);
	    */
	    this->unlock();

        this->running = false;
    }
}

Nebenbei bin ich mir auch bei der Threadsicherheit nich mehr so ganz sicher.
Wäre jedem Dankbar, der mir mal ne Fehlerkorrektur und Aufklärung geben könnte :)

Mfg Pain-maker
 
Zuletzt bearbeitet:
Okay, ich bin jetzt wieder einen Schritt zurückgegangen und wollte erstmal den Threadpool austesten. Leider habe ich damit wieder dasselbe Problem wie oben...
Ich starte jetzt nur noch den serverThread() und select()e die Clienten dort. Danach gebe ich diese in meinen Threadpool, der sie dann verarbeitet.
Um auszuschließen, dass es am Pool liegt, habe ich das mal auf eine einfache Thread-Funktion ausgelagert --> Gleiches Problem.
Hab jetzt sogar noch ein weiteres Lock eingeführt um zu verhindern, dass ein Client >1 in den select() gerät, wenn er schon bearbeitet wird.

PHP:
static unsigned __stdcall threadHandleConnection(void *socketPtr)
{
	MySocketServerWorker *worker = (MySocketServerWorker*)socketPtr;

	worker->run();
	return 0;
}

int MySocketServer::serverThread(void *ptr)
{
	MySocketServerWorker  *worker      = NULL;
	MySocketServer        *server      = (MySocketServer*)ptr; // this-Pointer
	SOCKET                clientSocket = INVALID_SOCKET;
	fd_set                *fdSet       = &server->fds;
	unsigned int          i            = 0;
	int                   ret          = 0;
    struct sockaddr_in    addr_in;
	socklen_t             addr_len     = sizeof(addr_in);
	
	// Server-Loop
	while(server->threadStatus == THREAD_RUNNING)
	{	
		FD_ZERO(fdSet);              // Inhalt leeren
	    FD_SET(server->hndl, fdSet); // Den Socket der Verbindungen annimmt hinzufügen
        
		// Neue Verbindungen registrieren
		for(i=0; i<server->maxConnections; i++)
		{
			if(server->clients[i].isValid() && !server->clients[i].isLocked()) // Lock-Prüfung ob der Client bereits angenommen wurde
				FD_SET(server->clients[i].getHandle(), fdSet);
		}
		
		// Nächste Verbindung annehmen
		ret = select(0, fdSet, NULL, NULL, NULL);
		if(ret == SOCKET_ERROR) 
		{
			//printf("Fehler: select(), error code: %d\n", WSAGetLastError());
			//return 0;
            continue;
		}

		// Server-Socket überprüfen -> Verbindung annehmen (sofern noch freie Verbindungen verfügbar sind)
		if(FD_ISSET(server->hndl, fdSet))
		{
			for(i=0; i<server->maxConnections; i++)
			{
				// Auf freien Worker prüfen und diesen sperren
				if(server->clientWorker[i].lock())
				{
					// Neuen Clienten annehmen
					clientSocket = accept(server->hndl, (struct sockaddr*)&addr_in, &addr_len);
					if(clientSocket != INVALID_SOCKET)
					{
						server->clientWorker[i].setClient(&server->clients[i]);
         		       	server->clientWorker[i].setParam(server->callbackParam);
			      		server->clientWorker[i].setCallback(server->callback);
                        server->clients[i].setHandle(clientSocket);
						
						// Funktioniert nicht
						//server->clientWorker[i].run();
						//server->threadPool->run(&server->clientWorker[i], Low);
					}
					else
					{
						server->clientWorker[i].unlock();
						printf("ERROR! Failed to accept client\n");
					}
					break;
				}
			}
			// Keine freien Verbindungen
			if(i == server->maxConnections)
			{
				printf("WARNING! Too many connections!\n");
			}
		}
		
        // Neue Anfragen beabreiten
		for(i=0; i<server->maxConnections; i++)
		{
			if(!server->clients[i].isValid())
				continue;

			// Anfrage verarbeiten
			if(FD_ISSET(server->clients[i].getHandle(), fdSet))
			{
				if(server->clients[i].lock()) // Zusätzliche Schutz-Maßnahme (Hat leider nicht geholfen)
				{
					// Funktioniert
					//server->clientWorker[i].run();

					// Funktioniert nicht
					//server->threadPool->run(&server->clientWorker[i], Low);
					_beginthreadex(NULL, 0, &threadHandleConnection, &server->clientWorker[i], 0, NULL);
				}
			}
		}
	}

	// Serverthread beendet
	server->threadStatus = THREAD_STOPPED;
	return 0;
}

Was mache ich falsch? Hoffe hierbei kann mir vllt eher geholfen werden.
Wenn irgendwas missverständlich ist, erläutere ich auch gerne einige Dinge noch etwas genauer.

Mfg Pain-maker
 
Okay, ich hab mein Problem jetzt selber gelöst.
Zum einen hatte ich einen Fehler in meiner Socket-Klasse, da ich das closesocket() in eine while()-Schleife gekapselt hatte, was anscheinend die ganze Kommunikation beeinträchtigt hat.
PHP:
// FALSCH!
while(closesocket(this->hndl) != 0)
    Sleep(1);

// RICHTIG
closesocket(this->hndl); // Reicht vollkommen aus

Ausserdem ist es natürlich vollkommen unnötig einen angenommenen Clienten in das FD_SET mit aufzunehmen, da ich innerhalb eines Threads via recv() und send() Daten von der Socket lese / Daten sende und diese blockieren, wenn keine Daten vorhanden / zu senden sind.
Folgender Code läuft bei mir fehlerfrei und ohne jegliche Verbindungsabbrüche :D

PHP:
int MySocketServer::serverThread(void *ptr) 
{ 
    MySocketServerWorker  *worker      = NULL; 
    MySocketServer        *server      = (MySocketServer*)ptr; // this-Pointer 
    SOCKET                clientSocket = INVALID_SOCKET; 
    fd_set                *fdSet       = &server->fds; 
    unsigned int          i            = 0; 
    int                   ret          = 0; 
    struct sockaddr_in    addr_in; 
    socklen_t             addr_len     = sizeof(addr_in); 
     
    // Server-Loop 
    while(server->threadStatus == THREAD_STATE_RUNNING) 
    {     
        FD_ZERO(fdSet);              // Inhalt leeren 
        FD_SET(server->hndl, fdSet); // Den Socket der Verbindungen annimmt hinzufügen 
		
        // Nächste Verbindung annehmen 
        ret = select(0, fdSet, NULL, NULL, NULL); 
        if(ret == SOCKET_ERROR)  
        { 
            printf("Fehler: select(), error code: %d\n", WSAGetLastError()); 
            continue; 
        } 

        // Server-Socket überprüfen -> Verbindung annehmen (sofern noch freie Verbindungen (Slots) verfügbar sind) 
        if(FD_ISSET(server->hndl, fdSet)) 
        { 
			/**
			 * Möglicherweise könnte hier noch eine 2. Schleife stehen die dafür sorgt, dass gewartet wird bis ein
			 * Slot frei wird. (Spinner)
			 */
            for(i=0; i<server->maxConnections; i++) 
            { 
                // Auf freien Slot prüfen und diesen sperren 
                if(server->client[i].lock()) // Innerhalb des Threads wird bei Abschluss ::unlock() aufgerufen 
                { 
                    // Neuen Clienten annehmen 
                    clientSocket = accept(server->hndl, (struct sockaddr*)&addr_in, &addr_len); 
                    if(clientSocket != INVALID_SOCKET) 
                    { 
						// Client-Slot belegen
                        server->clients[i].setHandle(clientSocket); 
                        server->clientWorker[i].setClient(&server->clients[i]); 
                        server->clientWorker[i].setParam(server->callbackParam); 
                        server->clientWorker[i].setCallback(server->callback); 
                        
						// Anfrage behandeln (Aufgabe/Arbeiter-Objekt in einen Thread-Pool geben)
                        server->threadPool.run(&server->clientWorker[i]);
                    } 
                    else 
                    { 
                        server->client[i].unlock(); 
                        printf("ERROR! Failed to accept client\n"); 
                    } 
                    break; 
                } 
            } 
			
            // Keine freien Verbindungen 
            if(i == server->maxConnections) 
                printf("WARNING! Too many connections!\n"); 
        }
    } 

    // Serverthread beendet 
    server->threadStatus = THREAD_STATE_STOPPED; 
    return 0; 
}

Vielleicht hilft's mal jemandem...
Mfg Pain-maker
 
1. Wofür benutzt du bei nur einem Socket select, wenn du auch gleich die blockende socket.accept()-Funktion aufrufen könntest?
2. Warum benutzt du nicht für die anderen Sockets ebenfalls select, dann könntes du auf den ganzen Thread-Kram verzichten
3. Wenn hinter lock() und unlock() ein echtes Lock sitzt, dann ist das eigentlich weniger die normale Nutzweise dafür. Eher sowas wie:
C++:
{
   Lock lock(sock.getLock());
   if(sock.isFree()) { ... }
}
 
1. Wofür benutzt du bei nur einem Socket select, wenn du auch gleich die blockende socket.accept()-Funktion aufrufen könntest?
Berechtigte Frage ;) Ich habe festgestellt, dass es so einfach besser läuft als wenn ich nur mit accept() und einer Blocking Sockets arbeite. Zudem akzeptiere ich auf diese Weise nur Verbindungen wenn ich auch wirklich Platz dafür habe.

2. Warum benutzt du nicht für die anderen Sockets ebenfalls select, dann könntes du auf den ganzen Thread-Kram verzichten
Wenn ich alle Sockets in einem Thread mittels select behandle, bin ich viiiiiel langsamer im Vergleich zu meinem ThreadPool. Wenn mal 100 Anfragen gleichzeitig kommen, könnte ich alle nur nacheinander abarbeiten, so aber geht das parallel.

3. Wenn hinter lock() und unlock() ein echtes Lock sitzt, dann ist das eigentlich weniger die normale Nutzweise dafür
Nein, lock und unlock greift nur auf ein volatile bool zu. Damit prüfe ich lediglich ob der "Slot" frei ist.
 
Zuletzt bearbeitet:
Berechtigte Frage ;) Ich habe festgestellt, dass es so einfach besser läuft als wenn ich nur mit accept() und einer Blocking Sockets arbeite. Zudem akzeptiere ich auf diese Weise nur Verbindungen wenn ich auch wirklich Platz dafür habe.
Okay, aber dann solltest du wenigstens nen Moment warten, wenn du die Verbindung nicht angenommen hast, da du sonst ganz schnell in ne Schleife kommt, die dir die CPU frisst.

Wenn ich alle Sockets in einem Thread mittels select behandle, bin ich viiiiiel langsamer im Vergleich zu meinem ThreadPool. Wenn mal 100 Anfragen gleichzeitig kommen, könnte ich alle nur nacheinander abarbeiten, so aber geht das parallel.
Dann hast du das prinzip von select nicht verstanden. Mit select prüfst du auf all deinen Sockets welche lesbar und schreibbar sind und welche nicht. Auf diesen Sockets arbeitest du dann nicht blockierend. Dadurch läuft das bearbeiten der Sockets im Prinzip ebenfalls parallel ab. Dafür sparst du dir jedoch das erzeugen von Threads und du hast kein Threadlimit, welches bei ein paar hundert Threads übrigens schon erreicht sein kann. Auch bekommst du dann bei interaktionen zwischen den Clients keinerlei Syncronisationsprobleme durch Threading.


Nein, lock und unlock greift nur auf ein volatile bool zu. Damit prüfe ich lediglich ob der "Slot" frei ist.
Ich bezweifel dass das threadsafe ist.
 
Ich habe einen Thread-POOL wodurch ich KEINE Zeit zum Starten von Threads verblase.
Und das mache ich, WEIL ich das Prinzip von select() verstanden habe. Denn wenn ich alle Clienten in den FD_SET packen würde, müsste ich immer mindestens eine weitere Schleife ablaufen, die die Clienten prüft (siehe mein 1 / 2.Post in diesem Thread)
Und ich habe auch einen Kommentar für einen Spinner in der Endfassung hinterlassen, der dafür sorgt, dass eine Verbindung gehalten wird, bis ein Slot frei wird.
Und zu der Thread-safety des Locks: volatile IST Thread-safe.

Oder versteh ich dich einfach falsch?

PS: Villeicht sollte ich auch erwähnen das das ein Webserver (HTTP) ist.
 
Ich habe einen Thread-POOL wodurch ich KEINE Zeit zum Starten von Threads verblase.
Und das mache ich, WEIL ich das Prinzip von select() verstanden habe. Denn wenn ich alle Clienten in den FD_SET packen würde, müsste ich immer mindestens eine weitere Schleife ablaufen, die die Clienten prüft (siehe mein 1 / 2.Post in diesem Thread)
Ja müsstest du, was allerdings keinerlei Aufwand ist, da die meiste Zeit sowieso beim Aufruf von select beim Warten auf "Events" vergehen wird. Also etwa so
Code:
   select
   if server in readables:
       accept()
   for(client in clients)
       if client in readables:
          client.handle_read()
       if client in writables:
          client.handle_write()
was jetzt etwas an das Modul asyncore aus der python Bibliotek angelehnt ist. Simpeler Ansatz aber ganz gut nutzbar: http://docs.python.org/library/asyncore.html

Auch für einen Webserver ist das wunderbar. Stell dir 100 Threads im Threadpool vor, macht bei 1mb Stacksize schon über 100mb für den Webserver. Bei 100 Objekten und etwas select-magic sind wir vllt bei 1mb, die da verbraten werden. Die ganze Lösung skaliert im Prinzip einfach besser als die threadbasierte.
Das ganze soll keine Vorschrift sein, ich will dir damit nur 'nen Tipp geben, wie mans auch, und häufig besser, machen könnte.
 
Und zu der Thread-safety des Locks: volatile IST Thread-safe.

1. Kleiner Tipp: Dieses übertriebene GROSSSCHREIBEN wirkt nicht gerade freundlich und muss meiner Meinung nach auch nicht sein.

2. Zu volatile:
It tells the compiler that the object is subject to sudden change for reasons which cannot be predicted from a study of the program itself, and forces every reference to such an object to be a genuine reference.

Threadsicher ist das nicht.
 
Im Normalfall ist es das definitiv nicht, aber in seinem Fall ist es das aber. Nur ein Thread, nämlich der Server ließt diese Variable. Wohl etwa so in lock()
C++:
  if(!locked) locked = true;
  return locked;

Und nur ein einziger anderer Thread schreibt sie mit einem simplen:
C++:
   locked = false,

Da wir davon ausgehen können, dass das bool nur 1 byte ist und somit nicht "halb" geschrieben werden kann, fällt mir jetzt keine Situation ein, wo das zu problemen führen kann.

Aber es ist natürich definitv nicht sauber.
 
Zurück