Thomas Darimont
Erfahrenes Mitglied
Hallo,
so sollte es jetzt funktionieren. Die Threadinformation hängt nun nicht mehr am Thread sondern am Runnable.
Gruß Tom
so sollte es jetzt funktionieren. Die Threadinformation hängt nun nicht mehr am Thread sondern am Runnable.
Java:
/**
*
*/
package de.tutorials;
import java.lang.ref.WeakReference;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* @author Thomas.Darimont
*
*/
public class CreationInfoAwareThreadingExample {
/**
* @param args
*/
public static void main(String[] args) {
final ExecutorService executorService = (ExecutorService) Proxy
.newProxyInstance(Object.class.getClassLoader(),
new Class[] { ExecutorService.class },
new ThreadCreationInfoAwareExecutorInterceptor(Executors
.newCachedThreadPool()));
executorService.execute(createJob(executorService)); //main
executorService.execute(createJob(executorService)); //main
executorService.execute(createJob(executorService)); //main
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
executorService.execute(createJob(executorService)); //pool thread
}
});
}
static class ThreadCreationInfoAwareExecutorInterceptor implements
InvocationHandler {
final ExecutorService executorService;
/**
* @param executorService
*/
public ThreadCreationInfoAwareExecutorInterceptor(
ExecutorService executorService) {
super();
this.executorService = executorService;
}
public Object invoke(final Object proxy, final Method method,
final Object[] args) throws Throwable {
if (args[0] instanceof ThreadCreationInfoAwareRunnable) {
final ThreadCreationInfo threadCreationInfo = new ThreadCreationInfo();
threadCreationInfo.setCreatorThread(new WeakReference<Thread>(
Thread.currentThread()));
threadCreationInfo.setCreatorThreadName(Thread.currentThread()
.getName());
threadCreationInfo.setStartTimeStackTraceOfCreatorThread(Thread
.currentThread().getStackTrace());
final ThreadCreationInfoAwareRunnable delegateeRunnable = (ThreadCreationInfoAwareRunnable) args[0];
Runnable delegatingRunnable = new Runnable() {
public void run() {
delegateeRunnable
.setThreadCreationInfo(threadCreationInfo);
delegateeRunnable.run();
}
};
args[0] = delegatingRunnable;
}
return method.invoke(this.executorService, args);
}
}
static class ThreadCreationInfo {
WeakReference<Thread> creatorThread;
String creatorThreadName;
StackTraceElement[] startTimeStackTraceOfCreatorThread;
/**
* @return the creatorThread
*/
public WeakReference<Thread> getCreatorThread() {
return creatorThread;
}
/**
* @param creatorThread
* the creatorThread to set
*/
public void setCreatorThread(WeakReference<Thread> creatorThread) {
this.creatorThread = creatorThread;
}
/**
* @return the creatorThreadName
*/
public String getCreatorThreadName() {
return creatorThreadName;
}
/**
* @param creatorThreadName
* the creatorThreadName to set
*/
public void setCreatorThreadName(String creatorThreadName) {
this.creatorThreadName = creatorThreadName;
}
/**
* @return the startTimeStackTraceOfCreatorThread
*/
public StackTraceElement[] getStartTimeStackTraceOfCreatorThread() {
return startTimeStackTraceOfCreatorThread;
}
/**
* @param startTimeStackTraceOfCreatorThread
* the startTimeStackTraceOfCreatorThread to set
*/
public void setStartTimeStackTraceOfCreatorThread(
StackTraceElement[] startTimeStackTraceOfCreatorThread) {
this.startTimeStackTraceOfCreatorThread = startTimeStackTraceOfCreatorThread;
}
}
static interface ThreadCreationInfoAwareRunnable extends Runnable {
ThreadCreationInfo getThreadCreationInfo();
void setThreadCreationInfo(ThreadCreationInfo threadCreationInfo);
}
/**
* @param executorService
* @return
*/
private static Runnable createJob(final ExecutorService executorService) {
Runnable job = new ThreadCreationInfoAwareRunnable() {
ThreadCreationInfo threadCreationInfo;
public void run() {
final String jobId = "" + hashCode();
System.out.println("Executing job: " + jobId);
Callable<String> task = createTaskFor(this);
final List<Future<String>> futureTaskResults = new CopyOnWriteArrayList<Future<String>>();
for (int i = 0; i < 3; i++) {
futureTaskResults.add(executorService.submit(task));
}
Callable<List<String>> resultRetriever = createResultRetriever(
jobId, futureTaskResults);
Future<List<String>> futureResults = executorService
.submit(resultRetriever);
try {
System.out.printf("finished job: %s with results: %s\n",
jobId, futureResults.get());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* @return the creationThreadInfo
*/
public ThreadCreationInfo getThreadCreationInfo() {
return threadCreationInfo;
}
/**
* @param threadCreationInfo
* the creationThreadInfo to set
*/
public void setThreadCreationInfo(
ThreadCreationInfo threadCreationInfo) {
this.threadCreationInfo = threadCreationInfo;
}
};
return job;
}
/**
*
* @param jobId
* @param futureTaskResults
* @return
*/
private static Callable<List<String>> createResultRetriever(
final String jobId, final List<Future<String>> futureTaskResults) {
Callable<List<String>> resultRetriever = new Callable<List<String>>() {
public List<String> call() throws Exception {
List<String> results = new ArrayList<String>();
while (!futureTaskResults.isEmpty()) {
for (Future<String> futureTaskResult : futureTaskResults) {
System.out
.println("Checking for completion of tasks in job: "
+ jobId);
try {
results.add(futureTaskResult.get(3,
TimeUnit.SECONDS));
futureTaskResults.remove(futureTaskResult);
} catch (TimeoutException e) {
System.out
.printf(
"Got time out while checking for completion of tasks in job: %s trying again...\n",
jobId);
continue;
// e.printStackTrace();
}
}
}
return results;
}
};
return resultRetriever;
}
/**
*
* @param jobId
* @return
*/
private static Callable<String> createTaskFor(
final ThreadCreationInfoAwareRunnable job) {
Callable<String> task = new Callable<String>() {
public String call() throws Exception {
String creatorThreadName = job.getThreadCreationInfo().getCreatorThreadName();
// ...
System.out.println("call task -> creatorThread: "
+ creatorThreadName);
String taskId = String.valueOf(System.nanoTime());
System.out.printf("Executing task %s within: %s\n", taskId, job
.hashCode());
TimeUnit.SECONDS.sleep((long) (Math.random() * 10 + 3));
System.out.printf("Completed task: %s within: %s\n", taskId,
job.hashCode());
return taskId;
}
};
return task;
}
}
Gruß Tom