komplizierter Java Queue Mechanismus

hasel80

Grünschnabel
Hallo,

ich habe ein Problem mit einer sehr komplexen Thread Thematik. Die Anforderung lautet, eine Queue zu bauen (FIFO), in die alle Client-Requests aufgenommen werden, alle x-Sekunden soll dann ein Thread für das jeweils erste Element in der Queue gestartet werden. Dieser Thread soll über RemoteShell ein Unix-Command auf einem anderen Rechner absetzen und das Ergebnis wieder zurückliefern. Sollte diese Routine mehr als y-Sekunden dauern, dann läuft der Thread in einen Timeout und der Request wird negativ beantwortet. In diesem Fall sollen dann auch alle anderen Requests, die bereits in der Queue vorhanden sind, negetiv beantwortet werden und erst nach z-Sekunden wieder neue Requests zugelassen werden.
Bitte beginnt hier kein Diskussion über die Sinnhaftigkeit dieser Anforderung. Diese ist leider fix.

Ich habe die Queue mit einer LinkedBlockingQueue umgesetzt. Sobald in der run-Methode starte ich dann einen neuen Thread für die Abarbeitung und einen für die Controlling Funktionalität. Wenn der Controller einen Timeout meldet, dann reagiere ich auf die Interrupted Exception und leere die Queue aus. Wäre das so einmal korrekt? Wie kann ich dann ein Ergebnis aus dem Shell-Command zurückgeben? Woher weiß ich, aus welchem Servlet der Request gekommen ist?

Ihr seht: Fragen über Fragen. Für jeden Hinweis bin ich dankbar.

Grüße
Hasel
 
Hallo,

schau mal hier:
Java:
/**
 * 
 */
package de.tutorials;

import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @author Tom
 * 
 */
public class RequestProcessorExample {

    // ConcurrentLinkedQueue<E>

    /**
     * @param args
     */
    public static void main(String[] args) {
        final RequestProcessor requestProcessor = new RequestProcessor();

        Executors.newSingleThreadExecutor().execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("Producing requests...");
                while (true) {
                    Request request = new Request();
                    System.out.println("Produced request: " + request);
                    requestProcessor.getRequestQueue().add(request);
                    try {
                        TimeUnit.MILLISECONDS.sleep(500L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        requestProcessor.start();

    }

    static class RequestProcessor {
        ConcurrentLinkedQueue<Request> requestQueue;
        ExecutorService executorService;

        int pollTimeOutInSeconds = 5;
        int requestProcessingTimeOutInSeconds = 2;

        public RequestProcessor() {
            this.requestQueue = new ConcurrentLinkedQueue<Request>();
            this.executorService = Executors.newCachedThreadPool();
        }

        public void start() {
            System.out.println("Start RequestProcessor");
            while (true) {

                if (!requestQueue.isEmpty()) {
                    Request request = requestQueue.poll();
                    System.out.println("Handling request: " + request);
                    Response response = process(request);
                    // do something with response...
                }
                try {
                    TimeUnit.SECONDS.sleep(pollTimeOutInSeconds);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        private Response process(Request request) {
            Response response = null;
            try {
                System.out.println("Process request: " + request);
                response = getExecutorService().submit(
                        new RequestHandler(request)).get(
                        requestProcessingTimeOutInSeconds, TimeUnit.SECONDS);
            } catch (Exception e) {
                e.printStackTrace();
            }
            if (null == response) {
                System.out
                        .println("Was not able to handle request: " + request);
                
                //request failed... remove all other elements from the queue...
                System.out.println("Clearing request queue...");
                requestQueue.clear();
                
            } else {
                System.out.println("Successfully handled request: " + request);
            }
            return response;
        }

        public ConcurrentLinkedQueue<Request> getRequestQueue() {
            return requestQueue;
        }

        public void setRequestQueue(ConcurrentLinkedQueue<Request> requestQueue) {
            this.requestQueue = requestQueue;
        }

        public ExecutorService getExecutorService() {
            return executorService;
        }

        public void setExecutorService(ExecutorService executorService) {
            this.executorService = executorService;
        }
    }

    static class RequestHandler implements Callable<Response> {

        Request request;

        public RequestHandler(Request request) {
            super();
            this.request = request;
        }

        @Override
        public Response call() throws Exception {
            try {
                TimeUnit.SECONDS.sleep((long) (Math.random() * 8));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new Response();
        }

        public Request getRequest() {
            return request;
        }

        public void setRequest(Request request) {
            this.request = request;
        }

    }

    static class Request {
        static int requestCount;
        int requestId = requestCount++;

        @Override
        public String toString() {
            return super.toString() + "[" + requestId + "]";
        }

    }

    static class Response {

    }

}

Siehe auch hier:
http://www.tutorials.de/forum/java/274056-fragen-threads-und-executorservice.html

Gruß Tom
 
Thread Pool und Queuemechanismus

Hallo,

ich habe ein ähnliches Problem.
Die Aufgabe besteht darin, dass ein "Masterthread" über einen Datenbankzugriff eine Treffermenge von IDs bekommt.
Aus diesen IDs werden Wrapper erstellt (ModelDataObjects), welche später noch mit Leben gefüllt werden sollen.
Diese Wrapper müssen dann von x Threads bearbeitet und via LPSQL Aufruf in XML umgewandelt werden.

Der eigentlich Haken an der Sache ist folgender:
Der Masterprozess schreibt die IDs in eine Art Queue (LinkedList)
Alle "WorkerThreads" laufen in einem Threadpool und holen sich die Aufgaben aus der Queue.
nachdem eine Aufgabe erfüllt wurde, wird diese aus der Queue entfernt und der Masterprozess benachrichtigt, dass der nächste Prozessabschnitt (nächster Arbeitsschritt z.B. XML in PDF umwandeln) ausgeführt werden kann

Leider habe ich mit Threads noch Nix gemacht und stocher diesbezüglich im Dunkeln herum. =(
Falls Jemand eine Schemenhafte Implementierung oder Idee liefern kann, wäre ich sehr dankbar.
Vor Allem ist mir der Mechanismus mit der Queue wichtig.

Vielen vielen Dank im Voraus!
 
Zurück