Fragen: Threads und ExecutorService

takidoso

Erfahrenes Mitglied
Hallo und Halli,
Ich habe im "Java ist auch eine Insel" etwas über die in Java 5 neu existierenden Schnittstellen gelesen.
Was mich nun interessiert ist einerseits die Möglichkeit mit dem Executors Thread-Pools zu verwenden, andererseits soll mein "aufrufender Thread", der weitere Threads einen solchen Thread-Pool aufruft dann warten bis alle fertig sind.
Dazu wird offenbar die Routine join() aus den jeweiligen Threads verwendet.

Meine Frage nun: kann man diese join Routine tatsächlich verwenden wenn die Threads über den Executors-Thread-Pool angestoßen werden?
Wie komme ich an diese ran?

Ich hatte mir gedacht, dass man einen fixen Pool vielleicht verwenden kann um die Anzahl von pallellaufenden Prozessen begrenzen zu können um verhindern zu können, dass die Performance des Rechners potenziell in die Knie geht, schließlich liefe au ihm noch anderer Kram der ncht behindert werden sollte.

Kann mir da jemand einen Tip geben, ob mein Vorhaben mittels Executors.newFixedThreadPool(int) und Thread.join() gelingen könnte, und wie es prinzipiell aufgebaut sein müsste?
Es gibt zwar auch invokeAll und ein invokeAny bei ExecutorService, aber irgendwie verstehe ich das dort so, dass es passieren kann, dass nicht alle Trheads freiwillig enden würden, oder verstehe ich da was falsch?

mit bestem Dank im Voraus

Takidoso
 
Hallo,

der Vorteil an der Verwendung der neuen Concurrent Geschichten wie ExecutorService etc. ist ja gerade der, dass mans ich nicht mehr mit low-level Threading abmühen muss...

Wenn du auf die fertzigstellung von mehreren Tasks wartest könntest du das Beispeislweise so machen:
Java:
/**
 * 
 */
package de.tutorials;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @author Thomas.Darimont
 * 
 */
public class JoinTasksExample {

    /**
     * @param args
     */
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();

        final CountDownLatch countdownLatch = new CountDownLatch(5);

        Runnable task = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    try {
                        System.out.println(Thread.currentThread().getName()
                                + ": " + i);
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                countdownLatch.countDown();
            }
        };

        Runnable otherTask = new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()
                        + ":waiting for completion of other tasks...");
                try {
                    countdownLatch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()
                        + ": Execute other task");

            }
        };

        for (int i = 0; i < 5; i++) {
            executorService.execute(task);
        }
        
        executorService.execute(otherTask);

    }
}

Ausgabe:
Code:
pool-1-thread-1: 0
pool-1-thread-3: 0
pool-1-thread-5: 0
pool-1-thread-2: 0
pool-1-thread-4: 0
pool-1-thread-6:waiting for completion of other tasks...
pool-1-thread-1: 1
pool-1-thread-4: 1
pool-1-thread-2: 1
pool-1-thread-5: 1
pool-1-thread-3: 1
pool-1-thread-1: 2
pool-1-thread-5: 2
pool-1-thread-3: 2
pool-1-thread-4: 2
pool-1-thread-2: 2
pool-1-thread-1: 3
pool-1-thread-5: 3
pool-1-thread-3: 3
pool-1-thread-4: 3
pool-1-thread-2: 3
pool-1-thread-1: 4
pool-1-thread-5: 4
pool-1-thread-2: 4
pool-1-thread-4: 4
pool-1-thread-3: 4
pool-1-thread-1: 5
pool-1-thread-3: 5
pool-1-thread-2: 5
pool-1-thread-4: 5
pool-1-thread-5: 5
pool-1-thread-1: 6
pool-1-thread-3: 6
pool-1-thread-2: 6
pool-1-thread-5: 6
pool-1-thread-4: 6
pool-1-thread-1: 7
pool-1-thread-3: 7
pool-1-thread-2: 7
pool-1-thread-4: 7
pool-1-thread-5: 7
pool-1-thread-1: 8
pool-1-thread-3: 8
pool-1-thread-5: 8
pool-1-thread-4: 8
pool-1-thread-2: 8
pool-1-thread-1: 9
pool-1-thread-3: 9
pool-1-thread-2: 9
pool-1-thread-4: 9
pool-1-thread-5: 9
pool-1-thread-6: Execute other task

Gruß Tom
 
Hallo Tom,
danke für Deinen Vorschlag bzw Beispiel.
ich habe nun weiter versucht die Kommentare und z.Z. den Sourcecode bezüglich der Excutors und ExecutorService zu verstehen und habe mal folgenden Code zusammen gebaut, wobei mich da etwas irritiert...
Code:
package de.equens.filedemon.data;

import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import de.cmk.util.FileModifiedTimeComparator;
import de.equens.filedemon.FileProcessor;


public class FileProcessorController
{
	List <FileProcessor>      m_fileProcessors = new ArrayList<FileProcessor>();
	Map  <File,FileProcessor> m_fileFPMap      = new HashMap<File,FileProcessor>();


    public void run()
    {

        boolean stillRunning = true; //TODO muss bezüglich Status des Controllers abbrechen oder weitergeführt weden
        while (stillRunning)
        {
        	SortedSet<File> sortedFiles = new TreeSet<File>(new FileModifiedTimeComparator());
        	for (FileProcessor fp : m_fileProcessors)
        	{
        	  File[] files = fp.getInputFiles();
        	  for (File f : files)
        	  {
        	    sortedFiles.add(f);
        	    m_fileFPMap.put(f,fp);
        	  }        		
        	}
        
        	
        	try
        	{
                   String acName = m_fileProcessors.get(0).getACName4SeqCheck();
                   if (acName != null && acName.length() > 0)
                   {
                      Set<File> files = m_fileFPMap.keySet();
                       for (File f : files)
                       {
                           FileProcessor fp = m_fileFPMap.get(f);
                           fp.checkInputNameOnSeqNum(f.getName());
                           fp.prepareInputFile4Processing(f);
                      }
                  }
				

		ExecutorService service = Executors.newFixedThreadPool(5);
		List <Future<FileProcessorCallResult>> results = service.invokeAll(m_fileProcessors);				
        	}
        	catch (Exception e)
        	{
        		// TODO ...
        	}

und zwar ist da ein merkwürdiger Compilefehler an dem dickgeruckten Code der da sagt
The method invokeAll(Collection<Callable<T>>) in the type ExecutorService is not applicable for
the arguments (List<FileProcessor>)

Das verstehe ich nicht so ganz, denn ein List-Object ist doch gleichzeitig ein Collection-Object. wwarum wird denn da gemosert?

jetz mal kurz zum Aufbau: ich habe bestimmte "Tasks" (FileProcessor) die vom Typ Callable sind und gebündelt durch invokeAll() aufgerufen werden sollen. dem Code von invokeAll(Collection) nach zu urteilen wird offenbar gewartet, bis alle durgelaufen sind oder terminiert wurden oder mit einer Exception sich beendet haben. Ich habe leider noch keine Ahnung wo ich die Exception eines Callables erkennen kann :confused:, denn in der java-Doku steht folgendes:
Code:
... ExecutorService ...
/**
     * Executes the given tasks, returning a list of Futures holding
     * their status and results when all complete. 
     * {@link Future#isDone} is <tt>true</tt> for each 
     * element of the returned list.
     * Note that a <em>completed</em> task could have
     * terminated either normally or by throwing an exception.
     * The results of this method are undefined if the given
     * collection is modified while this operation is in progress.
     * @param tasks the collection of tasks
     * @return A list of Futures representing the tasks, in the same
     * sequential order as produced by the iterator for the given task
     * list, each of which has completed.
     * @throws InterruptedException if interrupted while waiting, in
     * which case unfinished tasks are cancelled.
     * @throws NullPointerException if tasks or any of its elements are <tt>null</tt>
     * @throws RejectedExecutionException if any task cannot be scheduled
     * for execution
     */

    <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks)
        throws InterruptedException;
Leider gab es im Interface Futur und FutureTask auch keine Hinweise die mich weiterbrachten :mad:

für weitere Denkanstöße bin ich dankbar

Takidoso
 
Hallo,

wie wärs denn hiermit:
Java:
/**
 * 
 */
package de.tutorials;

import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * @author Thomas.Darimont
 * 
 */
public class FileProcessingService {

    /**
     * @param args
     */
    public static void main(String[] args) {

        final List<Future<FileProcessingResult>> fileProcessingResults = new CopyOnWriteArrayList<Future<FileProcessingResult>>();
        final ExecutorService executorService = Executors.newCachedThreadPool();

        fileProcessingResults.add(executorService.submit(new FileProcessor(
                "a.data")));
        fileProcessingResults.add(executorService.submit(new FileProcessor(
                "b.data")));
        fileProcessingResults.add(executorService.submit(new FileProcessor(
                "c.data")));

        System.out.println(fileProcessingResults);

        // Result fetcher...
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                while (!fileProcessingResults.isEmpty()) {
                    System.out.println("Fetching...");
                    for (Future<FileProcessingResult> future : fileProcessingResults) {
                        FileProcessingResult result = null;
                        try {
                            result = future.get(500, TimeUnit.MILLISECONDS);
                            System.out.println("Future returned:" + result);
                            if (null != result) {
                                System.out
                                        .println("removing future: " + future);
                                fileProcessingResults.remove(future);
                            }
                        } catch (TimeoutException timeoutException) {
                            System.out.println("Future: " + future
                                    + " not completed yet");

                        } catch (Exception exception) {
                            exception.printStackTrace();
                        }
                    }

                    try {
                        TimeUnit.SECONDS.sleep(5);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                executorService.shutdown();
            }
        });
    }

    static class FileProcessor implements Callable<FileProcessingResult> {

        String file;

        FileProcessingResult result;

        /**
         * @param file
         */
        public FileProcessor(String file) {
            super();
            this.file = file;
        }

        /**
         * @return the file
         */
        public String getFile() {
            return file;
        }

        /**
         * @param file
         *            the file to set
         */
        public void setFile(String file) {
            this.file = file;
        }

        /**
         * @return the result
         */
        public FileProcessingResult getResult() {
            return result;
        }

        /**
         * @param result
         *            the result to set
         */
        public void setResult(FileProcessingResult result) {
            this.result = result;
        }

        public void process(String file) {
            System.out.println("Processing: " + file);
            doSomeReallyComplicatedFileProcessingOperations();
            setResult(new FileProcessingResult(file));
        }

        private final static Random RANDOM = new Random();
        private void doSomeReallyComplicatedFileProcessingOperations() {
            try {
                long sleepTime = (long) (10L + RANDOM.nextInt(20));
                System.out.printf("sleeping: %s seconds\n", sleepTime);
                TimeUnit.SECONDS.sleep(sleepTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        @Override
        public FileProcessingResult call() throws Exception {
            process(getFile());
            return getResult();
        }
    }

    static class FileProcessingResult {
        String file;

        /**
         * @param file
         */
        public FileProcessingResult(String file) {
            super();
            this.file = file;
        }

        @Override
        public String toString() {
            return super.toString() + ": " + file;
        }
    }
}

Ausgabe:
Code:
Processing: a.data
sleeping: 10 seconds
Processing: c.data
sleeping: 22 seconds
Processing: b.data
sleeping: 21 seconds
[java.util.concurrent.FutureTask@157f0dc, java.util.concurrent.FutureTask@863399, java.util.concurrent.FutureTask@a59698]
Fetching...
Future: java.util.concurrent.FutureTask@157f0dc not completed yet
Future: java.util.concurrent.FutureTask@863399 not completed yet
Future: java.util.concurrent.FutureTask@a59698 not completed yet
Fetching...
Future: java.util.concurrent.FutureTask@157f0dc not completed yet
Future: java.util.concurrent.FutureTask@863399 not completed yet
Future: java.util.concurrent.FutureTask@a59698 not completed yet
Fetching...
Future returned:de.tutorials.FileProcessingService$FileProcessingResult@1af9e22: a.data
removing future: java.util.concurrent.FutureTask@157f0dc
Future: java.util.concurrent.FutureTask@863399 not completed yet
Future: java.util.concurrent.FutureTask@a59698 not completed yet
Fetching...
Future: java.util.concurrent.FutureTask@863399 not completed yet
Future: java.util.concurrent.FutureTask@a59698 not completed yet
Fetching...
Future returned:de.tutorials.FileProcessingService$FileProcessingResult@b6ece5: b.data
removing future: java.util.concurrent.FutureTask@863399
Future returned:de.tutorials.FileProcessingService$FileProcessingResult@17ace8d: c.data
removing future: java.util.concurrent.FutureTask@a59698

Gruß Tom
 
Hallo Tom,
das ist eine ganz interessante Vairante, die Du da gezaubert hast.
allerdings verstehe ich bei meinem Versuch den Compile Error nicht
in der Zeile
Code:
List <Future<FileProcessorCallResult>> results = service.invokeAll(m_fileProcessors);

denn meine "FileProcessorren" sind:
Code:
List <FileProcessor>  m_fileProcessors = new ArrayList<FileProcessor>();

und FileProcessor selbst ist:
Code:
public class FileProcessor implements Callable<FileProcessorCallResult>, FileProcessorStates

List ist ganz offiziell
Code:
public interface List<E> extends Collection<E>
:

Also was mache ich bei den Generics falsch? Was muss ich da ändern damit kein Compile-Fehler an dieser Stelle auftritt, der da lautet
The method invokeAll(Collection<Callable<T>>) in the type ExecutorService is not applicable for
the arguments (List<FileProcessor>)

Ich weiß das ist jetzt keine Thread Frage aber irgendwie blicke ich das jetzt nicht richtig.

Ich kam zwar auch auf die Idee mal einen Typecast zu machen aber irgendwie mag er den ncht
Code:
service.invokeAll((Collection<Callable<FileProcessorCallResult>>)m_fileProcessors);

dann kommt die Meldung:
Cannot cast from List<FileProcessor> to Collection<Callable<FileProcessorCallResult>>

Da ich erstmal nicht annehme dass dies ein Fehler von Java ist, frage ich mich wirklich was ich falsch mache :confused:
 
Hallo,

bist du sicher das das auch eine java.util.List ist? ;-)

Das hier kompiliert bei mir ohne Probleme:
Java:
/**
 * 
 */
package de.tutorials;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * @author Thomas.Darimont
 * 
 */
public class FileProcessingExample {

    /**
     * @param args
     */
    public static void main(String[] args) throws Exception {

        List<FileProcesor> fileProcessors = new ArrayList<FileProcesor>();
        List<Future<FileProcessorResult>> futures = Executors
                .newCachedThreadPool().invokeAll(fileProcessors);

    }

    static class FileProcesor implements Callable<FileProcessorResult> {
        @Override
        public FileProcessorResult call() throws Exception {
            return null;
        }
    }

    static class FileProcessorResult {

    }
}

Gruß Tom
 
Hallo Tom,
danke für das Beispiel, ich habe es bei mi rin Eclipse mal kurz reinkopiert.

ich bekomme folgende Fehler:

The method invokeAll(Collection<Callable<T>>) in the type ExecutorService is not applicable for
the arguments (List<Lala.FileProcesor>)

(ich hatte mal nur das Hauptobjekt geändert da ich noch so eine nette Testklasse irgenworumfliegen hatte.
desweiteren meckert er mir die call-Routine an
The method call() of type Lala.FileProcesor must override a superclass method
leigt vermutlich an dem @Override, denn nach entfernen ist es diese Fehlermeldung weg.

Ich arbeite mit einer Java 5 Umgebung. Kann es sein dass Du es mit einer Java 6 erfolgreich kompilieren konntest?

kannst Du dies mal mit Java 5 bei dir probieren, ob Du die selben Probleme hast, wenn nicht, dann ist offenbar irgenwas an meiner Umgebung drollig.
 
Hallo Tom,
Ich habe nun nochmal mit Deinem Beispiel rumgespielt
und habe folgendes:

Code:
package de.equens.filedemon.data;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * 
 *
 *  
 */
public class Lala
{

    /**
     * *
     *
     * @param args
     */
    public static void main(String[] args) throws Exception
    {
        List<Callable<FileProcessorResult>> fileProcessors1 = new ArrayList<Callable<FileProcessorResult>>();
        List<? extends Callable<FileProcessorResult>> fileProcessors2 = new ArrayList<FileProcesor>();
        
        
        fileProcessors1.add(new FileProcesor());
        fileProcessors2.add(new FileProcesor());
        
        ((FileProcesor)fileProcessors1.get(1)).foo();
        
        List<Future<FileProcessorResult>> futures = Executors.newCachedThreadPool().invokeAll(fileProcessors1);
        List<Future<FileProcessorResult>> futures2 = Executors.newCachedThreadPool().invokeAll(fileProcessors2);
        futures.get(1).get().getX();
        
    }

    static class FileProcesor implements Callable<FileProcessorResult>
    {
    	public String foo()
    	{
    		return "hallo";
    	}
        //@Override
        public FileProcessorResult call() throws Exception
        {
            return null;
        }
    }

    static class FileProcessorResult
    {
    	public int getX()
    	{
    		return 1;
    	}
    	
    	
    }
}

die dicken und unterstrichenen telie haben Compilefehler...
1. Stelle:
The method add(capture-of ? extends Callable<Lala.FileProcessorResult>) in the type
List<capture-of ? extends Callable<Lala.FileProcessorResult>> is not applicable for the arguments
(Lala.FileProcesor)

2. Stelle:
The method invokeAll(Collection<Callable<T>>) in the type ExecutorService is not applicable for
the arguments (List<capture-of ? extends Callable<Lala.FileProcessorResult>>)


Es scheint mir hier und auch in anderen ähnlicehn Versuchen ganz so, als ob Java an dieser Stelle nicht in der Lage ist zu erkennen dass
ein Callable<FileProcessorResult> nicht immer Kompatibel ist zur Klasse FileProcesor, obgleich diese besagte Schnittstelle impelemtiert.

naja wenigstens habe ich einen Weg aus der Misere gefunden, aber so toll finde ich es dann nicht wenn man doch wieder typecasten müsste um z.b. and die routine foo() zu kommen. Ich dachte Generics würden das beheben, aber wohl doch nicht immer.

Ob Java 6 dies anders oder besser macht kann ich z.Z. leider nur zu Hause nachgehen :-(
Gnerics sind an mancher Stelle doch komplizierter als ich dachte.

Takidoso
 
Zurück