diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a308480900cb4afbc73df5dc501ef80eb33f3d9..b7a0ed156c76038ce3729b1a3054adab9863cf2e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,8 @@ ## 1.7.0 * Fully support OpenCL 1.2 barrier() - localBarrier(), globalBarrier() and localGlobalBarrier() * Improved exception handling, stack traces no longer double print and Error and other throwables are never caught. +* Fix issue #62 - SEVERE log messages on Aparapi kernel profiling under multithreading +* Provide new interfaces for thread safe kernel profiling (mutiple threads calling same kernel class on same device) ## 1.6.0 diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index 3e27fafdbb722a105b5a0524f8fc3d9f6c22ea53..22b73081167099970dab4f182889d6a4d84c6454 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -46,3 +46,4 @@ Below are some of the specific details of various contributions. * Luis Mendes submited PR to support passing functions arguments containing Local arrays - issue #79 * Luis Mendes submited PR for issue #81 - Full OpenCL 1.2 atomics support with AtomicInteger * Luis Mendes submited PR for issue #84 - Fully support OpenCL 1.2 barrier() - localBarrier(), globalBarrier() and localGlobalBarrier() +* Luis Mendes with suggestions by Automenta submited PR for issue #62 and implemented new thread-sage API for Kernel profiling \ No newline at end of file diff --git a/pom.xml b/pom.xml index 545416fac00ce3c375b4dfa73b52599138ea9fd7..a80e286dcbd74dac714a2dc0b426bcb5584b2425 100644 --- a/pom.xml +++ b/pom.xml @@ -1,4 +1,3 @@ -<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> @@ -22,6 +21,8 @@ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> + <maven.compiler.testSource>1.8</maven.compiler.testSource> + <maven.compiler.testTarget>1.8</maven.compiler.testTarget> </properties> <name>Aparapi</name> diff --git a/src/main/java/com/aparapi/IProfileReportObserver.java b/src/main/java/com/aparapi/IProfileReportObserver.java new file mode 100644 index 0000000000000000000000000000000000000000..43c17ba895151141dc7cf891527db2e645df4da2 --- /dev/null +++ b/src/main/java/com/aparapi/IProfileReportObserver.java @@ -0,0 +1,44 @@ +/** + * Copyright (c) 2016 - 2018 Syncleus, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.aparapi; + +import java.lang.ref.WeakReference; + +import com.aparapi.device.Device; + +/** + * Defines interface for listener/observer of Kernel profile reports + * + * @author lpnm + */ +public interface IProfileReportObserver { + + /** + * The listener method will be invoked each time a profile report becomes available for each Aparapi Kernel which has + * a registered observer.<br/> + * <b>Note1: </b>A report will be generated by a thread executing a kernel, but if multiple threads execute the same kernel, + * on the same device, the report rate is limited to a single thread at a time per kernel per device.<br/> + * <br/> + * <b>Note2: </b>When an observer is registered there is no need to acknowledge the reception of a profile report, a new + * one will be automatically generated when another thread runs the same kernel on the same device. + * <br/> + * @param kernelClass the class of the kernel to which the profile report pertains + * @param device the device on which the kernel ran, producing the profile report + * @param profileInfo the profile report for the given Aparapi kernel and device pair + */ + public void receiveReport(final Class<? extends Kernel> kernelClass, final Device device, final WeakReference<ProfileReport> profileInfo); + +} diff --git a/src/main/java/com/aparapi/Kernel.java b/src/main/java/com/aparapi/Kernel.java index c489eae2d6590b6198c53b8c54aa14bf2a59aa14..c901a2b01611f6a3f250d13133df47e01b66e3a4 100644 --- a/src/main/java/com/aparapi/Kernel.java +++ b/src/main/java/com/aparapi/Kernel.java @@ -53,7 +53,6 @@ under those regulations, please refer to the U.S. Bureau of Industry and Securit package com.aparapi; import com.aparapi.annotation.Experimental; -import com.aparapi.exception.DeprecatedException; import com.aparapi.internal.model.CacheEnabler; import com.aparapi.internal.model.ClassModel.ConstantPool.MethodReferenceEntry; import com.aparapi.internal.model.ClassModel.ConstantPool.NameAndTypeEntry; @@ -67,6 +66,7 @@ import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import java.lang.ref.WeakReference; import java.lang.reflect.Method; import java.util.ArrayDeque; import java.util.Arrays; @@ -87,6 +87,7 @@ import com.aparapi.device.Device; import com.aparapi.device.JavaDevice; import com.aparapi.device.OpenCLDevice; import com.aparapi.internal.kernel.KernelArg; +import com.aparapi.internal.kernel.KernelDeviceProfile; import com.aparapi.internal.kernel.KernelManager; import com.aparapi.internal.kernel.KernelProfile; import com.aparapi.internal.kernel.KernelRunner; @@ -2499,11 +2500,115 @@ public abstract class Kernel implements Cloneable { } /** - * Determine the execution time of the previous Kernel.execute(range) call. + * Registers a new profile report observer to receive profile reports as they're produced. + * This is the method recommended when the client application desires to receive all the execution profiles + * for the current kernel instance on all devices over all client threads running such kernel with a single observer<br/> + * <b>Note1: </b>A report will be generated by a thread that finishes executing a kernel. In multithreaded execution + * environments it is up to the observer implementation to handle thread safety. + * <br/> + * <b>Note2: </b>To cancel the report subscription just set observer to <code>null</code> value. + * <br/> + * @param observer the observer instance that will receive the profile reports + */ + public void registerProfileReportObserver(IProfileReportObserver observer) { + KernelProfile profile = KernelManager.instance().getProfile(getClass()); + synchronized (profile) { + profile.setReportObserver(observer); + } + } + + /** + * Retrieves a profile report for the last thread that executed this kernel on the given device.<br/> + * A report will only be available if at least one thread executed the kernel on the device. + * + * <b>Note1: <b>If the profile report is intended to be kept in memory, the object should be cloned with + * {@link com.aparapi.ProfileReport#clone()}<br/> + * + * @param device the relevant device where the kernel executed + * + * @return <ul><li>the profiling report for the current most recent execution</li> + * <li>null, if no profiling report is available for such thread</li></ul> * - * Note that for the first call this will include the conversion time. + * @see #getProfileReportCurrentThread(Device) + * @see #registerProfileReportObserver(IProfileReportObserver) + * @see #getAccumulatedExecutionTimeAllThreads(Device) + * + * @see #getExecutionTimeLastThread() + * @see #getConversionTimeLastThread() + */ + public WeakReference<ProfileReport> getProfileReportLastThread(Device device) { + KernelProfile profile = KernelManager.instance().getProfile(getClass()); + KernelDeviceProfile deviceProfile = null; + boolean hasObserver = false; + synchronized (profile) { + if (profile.getReportObserver() != null) { + hasObserver = true; + } + deviceProfile = profile.getDeviceProfile(device); + } + + if (hasObserver) { + return null; + } + + return deviceProfile.getReportLastThread(); + } + + /** + * Retrieves the most recent complete report available for the current thread calling this method for + * the current kernel instance and executed on the given device.<br/> + * <b>Note1: <b>If the profile report is intended to be kept in memory, the object should be cloned with + * {@link com.aparapi.ProfileReport#clone()}<br/> + * <b>Note2: <b/>If the thread didn't execute this kernel on the specified device, it + * will return null. + * + * @param device the relevant device where the kernel executed + * + * @return <ul><li>the profiling report for the current most recent execution</li> + * <li>null, if no profiling report is available for such thread</li></ul> + * + * @see #getProfileReportLastThread(Device) + * @see #registerProfileReportObserver(IProfileReportObserver) + * @see #getExecutionTimeCurrentThread(Device) + * @see #getConversionTimeCurrentThread(Device) + * @see #getAccumulatedExecutionTimeAllThreads(Device) + */ + public WeakReference<ProfileReport> getProfileReportCurrentThread(Device device) { + KernelProfile profile = KernelManager.instance().getProfile(getClass()); + KernelDeviceProfile deviceProfile = null; + boolean hasObserver = false; + synchronized (profile) { + if (profile.getReportObserver() != null) { + hasObserver = true; + } + deviceProfile = profile.getDeviceProfile(device); + } + + if (hasObserver) { + return null; + } + + return deviceProfile.getReportCurrentThread(); + } + + /** + * Determine the execution time of the previous Kernel.execute(range) called from the last thread that ran and + * executed on the most recently used device. + * <br/> + * <b>Note1: </b>This is kept for backwards compatibility only, usage of either + * {@link #getProfileReportLastThread(Device)} or {@link #registerProfileReportObserver(IProfileReportObserver)} + * is encouraged instead.<br/> + * <b>Note2: </b>Calling this method is not recommended when using more than a single thread to execute + * the same kernel, or when running kernels on more than one device concurrently.<br/> + * <br/> + * Note that for the first call this will include the conversion time.<br/> + * <br/> + * @return <ul><li>The time spent executing the kernel (ms)</li> + * <li>NaN, if no profile report is available</li></ul> * - * @return The time spent executing the kernel (ms) + * @see #getProfileReportCurrentThread(Device) + * @see #registerProfileReportObserver(IProfileReportObserver) + * @see #getAccumulatedExecutionTimeAllThreads(Device) * * @see #getConversionTime(); * @see #getAccumulatedExecutionTime(); @@ -2517,37 +2622,123 @@ public abstract class Kernel implements Cloneable { } /** - * Determine the total execution time of all previous Kernel.execute(range) calls. + * Determine the time taken to convert bytecode to OpenCL for first Kernel.execute(range) call. + * <br/> + * <b>Note1: </b>This is kept for backwards compatibility only, usage of either + * {@link #getProfileReportLastThread(Device)} or {@link #registerProfileReportObserver(IProfileReportObserver)} + * is encouraged instead.<br/> + * <b>Note2: </b>Calling this method is not recommended when using more than a single thread to execute + * the same kernel, or when running kernels on more than one device concurrently.<br/> + * <br/> + * Note that for the first call this will include the conversion time.<br/> + * <br/> + * @return <ul><li>The time spent preparing the kernel for execution using GPU</li> + * <li>NaN, if no profile report is available</li></ul> * + * + * @see #getProfileReportCurrentThread(Device) + * @see #registerProfileReportObserver(IProfileReportObserver) + * @see #getAccumulatedExecutionTimeAllThreads(Device) + * + * @see #getAccumulatedExecutionTime(); + * @see #getExecutionTime(); + */ + public double getConversionTime() { + KernelProfile profile = KernelManager.instance().getProfile(getClass()); + synchronized (profile) { + return profile.getLastConversionTime(); + } + } + + /** + * Determine the total execution time of all previous kernel executions called from the current thread, + * calling this method, that executed the current kernel on the specified device. + * <br/> + * <b>Note1: </b>This is the recommended method to retrieve the accumulated execution time for a single + * current thread, even when doing multithreading for the same kernel and device. + * <br/> * Note that this will include the initial conversion time. * - * @return The total time spent executing the kernel (ms) + * @param the device of interest where the kernel executed * - * @see #getExecutionTime(); - * @see #getConversionTime(); + * @return <ul><li>The total time spent executing the kernel (ms)</li> + * <li>NaN, if no profiling information is available</li></ul> * + * @see #getProfileReportCurrentThread(Device) + * @see #getProfileReportLastThread(Device) + * @see #registerProfileReportObserver(IProfileReportObserver) + * @see #getAccumulatedExecutionTimeAllThreads(Device) */ - public double getAccumulatedExecutionTime() { + public double getAccumulatedExecutionTimeCurrentThread(Device device) { KernelProfile profile = KernelManager.instance().getProfile(getClass()); synchronized (profile) { - return profile.getAccumulatedTotalTime(); + KernelDeviceProfile deviceProfile = profile.getDeviceProfile(device); + if (deviceProfile == null) { + return Double.NaN; + } + return deviceProfile.getCumulativeElapsedTimeAllCurrentThread() / KernelProfile.MILLION; + } + } + + /** + * Determine the total execution time of all produced profile reports from all threads that executed the + * current kernel on the specified device. + * <br/> + * <b>Note1: </b>This is the recommended method to retrieve the accumulated execution time, even + * when doing multithreading for the same kernel and device. + * <br/> + * Note that this will include the initial conversion time. + * + * @param the device of interest where the kernel executed + * + * @return <ul><li>The total time spent executing the kernel (ms)</li> + * <li>NaN, if no profiling information is available</li></ul> + * + * @see #getProfileReportCurrentThread(Device) + * @see #getProfileReportLastThread(Device) + * @see #registerProfileReportObserver(IProfileReportObserver) + * @see Kernel#getAccumulatedExecutionTimeCurrentThread(Device) + */ + public double getAccumulatedExecutionTimeAllThreads(Device device) { + KernelProfile profile = KernelManager.instance().getProfile(getClass()); + synchronized (profile) { + KernelDeviceProfile deviceProfile = profile.getDeviceProfile(device); + if (deviceProfile == null) { + return Double.NaN; + } + return deviceProfile.getCumulativeElapsedTimeAllGlobal() / KernelProfile.MILLION; } } - + /** - * Determine the time taken to convert bytecode to OpenCL for first Kernel.execute(range) call. - * @return The time spent preparing the kernel for execution using GPU + * Determine the total execution time of all previous Kernel.execute(range) calls for all threads + * that ran this kernel for the device used in the last kernel execution. + * <br/> + * <b>Note1: </b>This is kept for backwards compatibility only, usage of + * {@link #getAccumulatedExecutionTimeAllThreads(Device)} is encouraged instead.<br/> + * <b>Note2: </b>Calling this method is not recommended when using more than a single thread to execute + * the same kernel on multiple devices concurrently.<br/> + * <br/> + * Note that this will include the initial conversion time. + * + * @return <ul><li>The total time spent executing the kernel (ms)</li> + * <li>NaN, if no profiling information is available</li></ul> + * + * @see #getAccumulatedExecutionTime(Device)); + * @see #getProfileReport(Device) + * @see #registerProfileReportObserver(IProfileReportObserver) * * @see #getExecutionTime(); - * @see #getAccumulatedExecutionTime(); + * @see #getConversionTime(); + * */ - public double getConversionTime() { + public double getAccumulatedExecutionTime() { KernelProfile profile = KernelManager.instance().getProfile(getClass()); synchronized (profile) { - return profile.getLastConversionTime(); + return profile.getAccumulatedTotalTime(); } } - + /** * Start execution of <code>_range</code> kernels. * <p> diff --git a/src/main/java/com/aparapi/ProfileReport.java b/src/main/java/com/aparapi/ProfileReport.java new file mode 100644 index 0000000000000000000000000000000000000000..de7a8b80ff81801ba3e8262f96dbd63e1ddd484f --- /dev/null +++ b/src/main/java/com/aparapi/ProfileReport.java @@ -0,0 +1,154 @@ +/** + * Copyright (c) 2016 - 2018 Syncleus, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.aparapi; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import com.aparapi.device.Device; +import com.aparapi.internal.kernel.ProfilingEvent; + +public final class ProfileReport { + private static final int NUM_EVENTS = ProfilingEvent.values().length; + private static final double MILLION = 1000000d; + private long id; + private final Class<? extends Kernel> kernelClass; + private final long threadId; + private final Device device; + private final long currentTimes[] = new long[NUM_EVENTS]; + private final String[] stagesNames; + + /** + * Creates a profile report pertaining to a given thread that executed kernel class on the specified device. + * @param _threadId the id of thread that executed the kernel + * @param clazz the class of the executed kernel + * @param _device the device where the kernel executed + */ + public ProfileReport(final long _threadId, final Class<? extends Kernel> clazz, final Device _device) { + threadId = _threadId; + kernelClass = clazz; + device = _device; + stagesNames = ProfilingEvent.getStagesNames(); + } + + /** + * Sets specific report data. + * @param reportId the unique identifier for this report (the identifier is unique within the <kernel,device> tuple) + * @param _currentTimes the profiling data + */ + public void setProfileReport(final long reportId, final long[] _currentTimes) { + id = reportId; + System.arraycopy(_currentTimes, 0, currentTimes, 0, NUM_EVENTS); + } + + /** + * Retrieves the current report unique identifier.<br/> + * <b>Note: </b>The identifier is monotonically incremented at each new report for the current + * <kernel, device> tuple. + * @return the report id + */ + public long getReportId() { + return id; + } + + /** + * Retrieves the thread id of the thread that executed the kernel, producing this profile report. + * @return the thread id + */ + public long getThreadId() { + return threadId; + } + + /** + * Retrieves the class of the kernel to which this profile report pertains to + * @return the Aparapi kernel class + */ + public Class<? extends Kernel> getKernelClass() { + return kernelClass; + } + + /** + * Retrieves the Aparapi device where the kernel was executed, producing this profile report. + * @return the Aparapi device + */ + public Device getDevice() { + return device; + } + + /** + * Get the names of the stages for which data was collected. + * @return the list with the stages names + */ + public List<String> getStageNames() { + return Collections.unmodifiableList(Arrays.asList(stagesNames)); + } + + /** + * The number of stages available with report data. + * @return the number of stages + */ + public int getNumberOfStages() { + return stagesNames.length; + } + + /** + * Get the name of a given stage + * @param stage the index of the stage + * @return the stage name + */ + public String getStageName(int stage) { + return stagesNames[stage]; + } + + /** Elapsed time for a single event only, i.e. since the previous stage rather than from the start. */ + public double getElapsedTime(int stage) { + if (stage == ProfilingEvent.START.ordinal()) { + return 0; + } + return (currentTimes[stage] - currentTimes[stage - 1]) / MILLION; + } + + /** Elapsed time for all events {@code from} through {@code to}.*/ + public double getElapsedTime(int from, int to) { + return (currentTimes[to] - currentTimes[from]) / MILLION; + } + + /** + * Determine the execution time of the Kernel.execute(range) call from this report. + * + * @return The time spent executing the kernel (ms) + */ + public double getExecutionTime() { + return getElapsedTime(ProfilingEvent.START.ordinal(), ProfilingEvent.EXECUTED.ordinal()); + } + + /** + * Determine the time taken to convert bytecode to OpenCL for first Kernel.execute(range) call. + * + * @return The time spent preparing the kernel for execution using GPU (ms) + */ + public double getConversionTime() { + return getElapsedTime(ProfilingEvent.START.ordinal(), ProfilingEvent.PREPARE_EXECUTE.ordinal()); + } + + @Override + public ProfileReport clone() { + ProfileReport r = new ProfileReport(threadId, kernelClass, device); + r.setProfileReport(id, currentTimes); + return r; + } +} diff --git a/src/main/java/com/aparapi/internal/kernel/KernelDeviceProfile.java b/src/main/java/com/aparapi/internal/kernel/KernelDeviceProfile.java index 16e64715650eaa830c6af5820ef0a4895bb79cac..a45cfd8361595245f654bedaf3b2e18785f01a75 100644 --- a/src/main/java/com/aparapi/internal/kernel/KernelDeviceProfile.java +++ b/src/main/java/com/aparapi/internal/kernel/KernelDeviceProfile.java @@ -18,8 +18,13 @@ package com.aparapi.internal.kernel; import com.aparapi.*; import com.aparapi.device.*; +import java.lang.ref.WeakReference; import java.text.*; import java.util.*; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongArray; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.logging.*; /** @@ -28,19 +33,26 @@ import java.util.logging.*; public class KernelDeviceProfile { private static Logger logger = Logger.getLogger(Config.getLoggerName()); + private static final int NUM_EVENTS = ProfilingEvent.values().length; private static final double MILLION = 1000 * 1000; private static final int TABLE_COLUMN_HEADER_WIDTH = 21; private static final int TABLE_COLUMN_COUNT_WIDTH = 8; private static final int TABLE_COLUMN_WIDTH; private static String tableHeader = null; + + private final KernelProfile parentKernelProfile; private final Class<? extends Kernel> kernel; private final Device device; - private long[] currentTimes = new long[ProfilingEvent.values().length]; - private long[] accumulatedTimes = new long[ProfilingEvent.values().length]; - private ProfilingEvent lastEvent = null; private final DecimalFormat format; - private long invocationCount = 0; + private final AtomicLong invocationCountGlobal = new AtomicLong(0); + private final AtomicReference<Accumulator> lastAccumulator = new AtomicReference<Accumulator>(null); + private final GlobalAccumulator globalAcc = new GlobalAccumulator(); + + private final Map<Thread,Accumulator> accs = Collections.synchronizedMap( + new WeakHashMap<Thread,Accumulator>(Runtime.getRuntime().availableProcessors()*2, 0.95f) + ); + static { assert ProfilingEvent.START.ordinal() == 0 : "ProfilingEvent.START.ordinal() != 0"; int max = 0; @@ -50,7 +62,123 @@ public class KernelDeviceProfile { TABLE_COLUMN_WIDTH = max + 1; } - public KernelDeviceProfile(Class<? extends Kernel> kernel, Device device) { + private class GlobalAccumulator { + private final AtomicLongArray accumulatedTimes = new AtomicLongArray(NUM_EVENTS); + private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + private void accumulateTimes(final long[] currentTimes) { + //Read lock is only exclusive to write lock, thus many threads can update + //the accumulated times simultaneously. + lock.readLock().lock(); + try { + for (int i = 1; i < currentTimes.length; ++i) { + long elapsed = currentTimes[i] - currentTimes[i - 1]; + + accumulatedTimes.addAndGet(i, elapsed); + } + } finally { + lock.readLock().unlock(); + } + } + + private void consultAccumulatedTimes(final long[] accumulatedTimesHolder) { + //Write lock is exclusive to all other locks, so only one thread can retrieve + //the accumulated times at a given moment. + lock.writeLock().lock(); + try { + for (int i = 0; i < NUM_EVENTS; i++) { + accumulatedTimesHolder[i] = accumulatedTimes.get(i); + } + } finally { + lock.writeLock().unlock(); + } + } + } + + private class Accumulator { + private final long threadId; + private final long[] currentTimes = new long[NUM_EVENTS]; + private final long[] accumulatedTimes = new long[NUM_EVENTS]; + private final ProfileReport report; + private final WeakReference<ProfileReport> reportRef; + private ProfilingEvent lastEvent = null; + private int invocationCount = 0; + + private Accumulator(long _threadId) { + threadId = _threadId; + report = new ProfileReport(threadId, kernel, device); + reportRef = new WeakReference<>(report); + } + + private void parseStartEventHelper(final ProfilingEvent event) { + if (event == ProfilingEvent.START) { + if (lastEvent != null) { + logger.log(Level.SEVERE, "ProfilingEvent.START encountered without ProfilingEvent.EXECUTED"); + } else if (lastEvent == ProfilingEvent.START) { + logger.log(Level.SEVERE, "Duplicate event ProfilingEvent.START"); + } + Arrays.fill(currentTimes, 0L); + ++invocationCount; + invocationCountGlobal.incrementAndGet(); + } else { + if (lastEvent == null) { + if (event != ProfilingEvent.EXECUTED) { + logger.log(Level.SEVERE, "ProfilingEvent.START was not invoked prior to ProfilingEvent." + event); + } + } else { + for (int i = lastEvent.ordinal() + 1; i < event.ordinal(); ++i) { + currentTimes[i] = currentTimes[i - 1]; + } + } + } + currentTimes[event.ordinal()] = System.nanoTime(); + if (event == ProfilingEvent.EXECUTED) { + for (int i = 1; i < currentTimes.length; ++i) { + long elapsed = currentTimes[i] - currentTimes[i - 1]; + if (elapsed < 0) { + logger.log(Level.SEVERE, "negative elapsed time for event " + event); + break; + } + accumulatedTimes[i] += elapsed; + } + + globalAcc.accumulateTimes(currentTimes); + lastAccumulator.set(this); + } + } + + private void onEvent(final ProfilingEvent event) { + parseStartEventHelper(event); + + lastEvent = event; + if (event == ProfilingEvent.EXECUTED) { + updateProfileReport(report, invocationCount, currentTimes); + IProfileReportObserver observer = parentKernelProfile.getReportObserver(); + lastEvent = null; + if (observer != null) { + observer.receiveReport(kernel, device, reportRef); + } + } + } + } + + private Accumulator getAccForThreadPutIfAbsent() { + Thread t = Thread.currentThread(); + Accumulator a = accs.get(t); + if (a == null) { + a = new Accumulator(t.getId()); + accs.put(t, a); + } + return a; + } + + private Accumulator getAccForThread() { + Thread t = Thread.currentThread(); + return accs.get(t); + } + + public KernelDeviceProfile(KernelProfile parentProfile, Class<? extends Kernel> kernel, Device device) { + this.parentKernelProfile = parentProfile; this.kernel = kernel; this.device = device; this.format = (DecimalFormat) DecimalFormat.getNumberInstance(); @@ -59,65 +187,150 @@ public class KernelDeviceProfile { } public void onEvent(ProfilingEvent event) { - if (event == ProfilingEvent.START) { - if (lastEvent != null) { - logger.log(Level.SEVERE, "ProfilingEvent.START encountered without ProfilingEvent.EXECUTED"); - } else if (lastEvent == ProfilingEvent.START) { - logger.log(Level.SEVERE, "Duplicate event ProfilingEvent.START"); - } - Arrays.fill(currentTimes, 0L); - ++invocationCount; - } else { - if (lastEvent == null) { - if (event != ProfilingEvent.EXECUTED) { - logger.log(Level.SEVERE, "ProfilingEvent.START was not invoked prior to ProfilingEvent." + event); - } - } else { - for (int i = lastEvent.ordinal() + 1; i < event.ordinal(); ++i) { - currentTimes[i] = currentTimes[i - 1]; - } - } - } - currentTimes[event.ordinal()] = System.nanoTime(); - if (event == ProfilingEvent.EXECUTED) { - for (int i = 1; i < currentTimes.length; ++i) { - long elapsed = currentTimes[i] - currentTimes[i - 1]; - if (elapsed < 0) { - logger.log(Level.SEVERE, "negative elapsed time for event " + event); - break; - } - accumulatedTimes[i] += elapsed; - } - } - lastEvent = event; - if (event == ProfilingEvent.EXECUTED) { - lastEvent = null; - } + getAccForThreadPutIfAbsent().onEvent(event); + } + + private ProfileReport updateProfileReport(final ProfileReport report, long invocationCount, long[] currentTimes) { + report.setProfileReport(invocationCount, currentTimes); + + return report; + } + + /** + * Elapsed time for a single event only and for the current thread, i.e. since the previous stage rather than from the start. + * + * + */ + public double getElapsedTimeCurrentThread(int stage) { + if (stage == ProfilingEvent.START.ordinal()) { + return 0; + } + + Accumulator acc = getAccForThread(); + + return acc == null ? Double.NaN : (acc.currentTimes[stage] - acc.currentTimes[stage - 1]) / MILLION; + } + + /** Elapsed time for all events {@code from} through {@code to} for the current thread.*/ + public double getElapsedTimeCurrentThread(int from, int to) { + Accumulator acc = getAccForThread(); + + return acc == null ? Double.NaN : (acc.currentTimes[to] - acc.currentTimes[from]) / MILLION; + } + + /** + * Retrieves the most recent complete report available for the current thread calling this method.<br/> + * <b>Note1: <b>If the profile report is intended to be kept in memory, the object should be cloned with + * {@link com.aparapi.ProfileReport#clone()}<br/> + * <b>Note2: <b/>If the thread didn't execute this KernelDeviceProfile instance respective kernel and device, it + * will return null. + * @return <ul><li>the profiling report for the current most recent execution</li> + * <li>null, if no profiling report is available for such thread</li></ul> + */ + public WeakReference<ProfileReport> getReportCurrentThread() { + Accumulator acc = getAccForThread(); + + return acc == null ? null : acc.reportRef; + } + + /** + * Retrieves the most recent complete report available for the last thread that executed this KernelDeviceProfile + * instance respective kernel and device.<br/> + * <b>Note1: <b>If the profile report is intended to be kept in memory, the object should be cloned with + * {@link com.aparapi.ProfileReport#clone()}<br/> + * + * @return <ul><li>the profiling report for the current most recent execution</li> + * <li>null, if no profiling report is available yet</li></ul> + */ + public WeakReference<ProfileReport> getReportLastThread() { + Accumulator acc = lastAccumulator.get(); + + return acc == null ? null : acc.reportRef; + } + + /** + * Elapsed time for a single event only, i.e. since the previous stage rather than from the start, summed over all executions, + * for the current thread, if it has executed the kernel on the device assigned to this KernelDeviceProfile instance. + * + * @param stage the event stage + */ + public double getCumulativeElapsedTimeCurrrentThread(ProfilingEvent stage) { + Accumulator acc = getAccForThread(); + + return acc == null ? Double.NaN : acc.accumulatedTimes[stage.ordinal()] / MILLION; } + + /** + * Elapsed time of entire execution, summed over all executions, for the current thread, + * if it has executed the kernel on the device assigned to this KernelDeviceProfile instance. + */ + public double getCumulativeElapsedTimeAllCurrentThread() { + double sum = 0; + + Accumulator acc = getAccForThread(); + if (acc == null) { + return sum; + } - /** Elapsed time for a single event only, i.e. since the previous stage rather than from the start. */ - public double getLastElapsedTime(ProfilingEvent stage) { - if (stage == ProfilingEvent.START) { - return 0; + for (int i = 1; i <= ProfilingEvent.EXECUTED.ordinal(); ++i) { + sum += acc.accumulatedTimes[i]; } - return (currentTimes[stage.ordinal()] - currentTimes[stage.ordinal() - 1]) / MILLION; + + return sum; } - /** Elapsed time for all events {@code from} through {@code to}.*/ - public double getLastElapsedTime(ProfilingEvent from, ProfilingEvent to) { - return (currentTimes[to.ordinal()] - currentTimes[from.ordinal()]) / MILLION; + /** + * Elapsed time for a single event only and for the last thread that finished executing a kernel, + * i.e. single event only - since the previous stage rather than from the start. + * @param stage the event stage + */ + public double getElapsedTimeLastThread(int stage) { + if (stage == ProfilingEvent.START.ordinal()) { + return 0; + } + + Accumulator acc = lastAccumulator.get(); + + return acc == null ? Double.NaN : (acc.currentTimes[stage] - acc.currentTimes[stage - 1]) / MILLION; } + + /** + * Elapsed time for all events {@code from} through {@code to} for the last thread that executed this KernelDeviceProfile + * instance respective kernel and device. + * + * @param from the first event to consider that defines the elapsed period start + * @param to the last event to consider for elapsed period + */ + public double getElapsedTimeLastThread(int from, int to) { + Accumulator acc = lastAccumulator.get(); + + return acc == null ? Double.NaN : (acc.currentTimes[to] - acc.currentTimes[from]) / MILLION; + } + + /** + * Elapsed time for a single event only, i.e. since the previous stage rather than from the start, summed over all executions, + * for the last thread that executed this KernelDeviceProfile instance respective kernel and device. + * + * @param stage the event stage + */ + public double getCumulativeElapsedTimeGlobal(ProfilingEvent stage) { + final long[] accumulatedTimesHolder = new long[NUM_EVENTS]; + globalAcc.consultAccumulatedTimes(accumulatedTimesHolder); - /** Elapsed time for a single event only, i.e. since the previous stage rather than from the start, summed over all executions. */ - public double getCumulativeElapsedTime(ProfilingEvent stage) { - return (accumulatedTimes[stage.ordinal()]) / MILLION; + return accumulatedTimesHolder[stage.ordinal()] / MILLION; } + + /** + * Elapsed time of entire execution, summed over all executions, for all the threads, + * that executed the kernel on this device. + */ + public double getCumulativeElapsedTimeAllGlobal() { + final long[] accumulatedTimesHolder = new long[NUM_EVENTS]; + globalAcc.consultAccumulatedTimes(accumulatedTimesHolder); - /** Elapsed time of entire execution, summed over all executions. */ - public double getCumulativeElapsedTimeAll() { double sum = 0; for (int i = 1; i <= ProfilingEvent.EXECUTED.ordinal(); ++i) { - sum += accumulatedTimes[i]; + sum += accumulatedTimesHolder[i]; } return sum; } @@ -138,13 +351,22 @@ public class KernelDeviceProfile { return tableHeader; } - public String getLastAsTableRow() { + public String getLastAsTableRow() { + //At the end of execution profile data may no longer be available due to the weak references, + //thus it is best to use the last report + StringBuilder builder = new StringBuilder(150); + Accumulator acc = lastAccumulator.get(); + if (acc == null) { + appendRowHeaders(builder, device.getShortDescription(), String.valueOf(invocationCountGlobal.get())); + builder.append("No thread available"); + return builder.toString(); + } + double total = 0; - StringBuilder builder = new StringBuilder(150); - appendRowHeaders(builder, device.getShortDescription(), String.valueOf(invocationCount)); - for (int i = 1; i < currentTimes.length; ++i) { + appendRowHeaders(builder, device.getShortDescription(), String.valueOf(invocationCountGlobal.get())); + for (int i = 1; i < NUM_EVENTS; ++i) { ProfilingEvent stage = ProfilingEvent.values()[i]; - double time = getLastElapsedTime(stage); + double time = getElapsedTimeLastThread(stage.ordinal()); total += time; String formatted = format.format(time); appendCell(builder, formatted); @@ -163,12 +385,12 @@ public class KernelDeviceProfile { private String internalCumulativeAsTableRow(boolean mean) { double total = 0; - double count = mean ? invocationCount : 1; + double count = mean ? invocationCountGlobal.get() : 1; StringBuilder builder = new StringBuilder(150); - appendRowHeaders(builder, device.getShortDescription(), String.valueOf(invocationCount)); - for (int i = 1; i < currentTimes.length; ++i) { + appendRowHeaders(builder, device.getShortDescription(), String.valueOf(invocationCountGlobal.get())); + for (int i = 1; i < NUM_EVENTS; ++i) { ProfilingEvent stage = ProfilingEvent.values()[i]; - double time = getCumulativeElapsedTime(stage); + double time = getCumulativeElapsedTimeGlobal(stage); if (mean) { time /= count; } diff --git a/src/main/java/com/aparapi/internal/kernel/KernelProfile.java b/src/main/java/com/aparapi/internal/kernel/KernelProfile.java index 56538d0890ed4a512bb6a834e8469737780606c3..6803d8850b4ee7792422091e6c1a2f8424001240 100644 --- a/src/main/java/com/aparapi/internal/kernel/KernelProfile.java +++ b/src/main/java/com/aparapi/internal/kernel/KernelProfile.java @@ -27,13 +27,14 @@ import java.util.logging.*; */ public class KernelProfile { - private static final double MILLION = 1000000d; + public static final double MILLION = 1000000d; private static Logger logger = Logger.getLogger(Config.getLoggerName()); private final Class<? extends Kernel> kernelClass; private LinkedHashMap<Device, KernelDeviceProfile> deviceProfiles = new LinkedHashMap<>(); private Device currentDevice; private Device lastDevice; private KernelDeviceProfile currentDeviceProfile; + private IProfileReportObserver observer; public KernelProfile(Class<? extends Kernel> _kernelClass) { kernelClass = _kernelClass; @@ -41,12 +42,12 @@ public class KernelProfile { public double getLastExecutionTime() { KernelDeviceProfile lastDeviceProfile = getLastDeviceProfile(); - return lastDeviceProfile == null ? Double.NaN : lastDeviceProfile.getLastElapsedTime(ProfilingEvent.START, ProfilingEvent.EXECUTED) / MILLION; + return lastDeviceProfile == null ? Double.NaN : lastDeviceProfile.getElapsedTimeLastThread(ProfilingEvent.START.ordinal(), ProfilingEvent.EXECUTED.ordinal()); } public double getLastConversionTime() { KernelDeviceProfile lastDeviceProfile = getLastDeviceProfile(); - return lastDeviceProfile == null ? Double.NaN : lastDeviceProfile.getLastElapsedTime(ProfilingEvent.START, ProfilingEvent.PREPARE_EXECUTE) / MILLION; + return lastDeviceProfile == null ? Double.NaN : lastDeviceProfile.getElapsedTimeLastThread(ProfilingEvent.START.ordinal(), ProfilingEvent.PREPARE_EXECUTE.ordinal()); } public double getAccumulatedTotalTime() { @@ -55,7 +56,7 @@ public class KernelProfile { return Double.NaN; } else { - return lastDeviceProfile.getCumulativeElapsedTimeAll() / MILLION; + return lastDeviceProfile.getCumulativeElapsedTimeAllGlobal() / MILLION; } } @@ -64,15 +65,16 @@ public class KernelProfile { } void onStart(Device device) { - currentDevice = device; synchronized (deviceProfiles) { currentDeviceProfile = deviceProfiles.get(device); if (currentDeviceProfile == null) { - currentDeviceProfile = new KernelDeviceProfile(kernelClass, device); + currentDeviceProfile = new KernelDeviceProfile(this, kernelClass, device); deviceProfiles.put(device, currentDeviceProfile); } } + currentDeviceProfile.onEvent(ProfilingEvent.START); + currentDevice = device; } void onEvent(ProfilingEvent event) { @@ -118,4 +120,12 @@ public class KernelProfile { public KernelDeviceProfile getDeviceProfile(Device device) { return deviceProfiles.get(device); } + + public void setReportObserver(IProfileReportObserver _observer) { + observer = _observer; + } + + public IProfileReportObserver getReportObserver() { + return observer; + } } diff --git a/src/main/java/com/aparapi/internal/kernel/ProfilingEvent.java b/src/main/java/com/aparapi/internal/kernel/ProfilingEvent.java index fe6518cb400da52b7bcf73f7675914921beea85e..24a378dbc17164f2c6b383d8929f7819aa39f917 100644 --- a/src/main/java/com/aparapi/internal/kernel/ProfilingEvent.java +++ b/src/main/java/com/aparapi/internal/kernel/ProfilingEvent.java @@ -15,9 +15,30 @@ */ package com.aparapi.internal.kernel; +import java.util.concurrent.atomic.AtomicReference; + /** * Created by Barney on 02/09/2015. */ public enum ProfilingEvent { - START, CLASS_MODEL_BUILT, INIT_JNI, OPENCL_GENERATED, OPENCL_COMPILED, PREPARE_EXECUTE, EXECUTED + START, CLASS_MODEL_BUILT, INIT_JNI, OPENCL_GENERATED, OPENCL_COMPILED, PREPARE_EXECUTE, EXECUTED; + + + static final AtomicReference<String[]> stagesNames = new AtomicReference<String[]>(null); + public static String[] getStagesNames() { + String[] result = null; + result = stagesNames.get(); + if (result == null) { + final String[] names = new String[values().length]; + for (int i = 0; i < values().length; i++) { + names[i] = values()[i].name(); + } + if (stagesNames.compareAndSet(null, names)) { + result = names; + } else { + result = stagesNames.get(); + } + } + return result; + } } diff --git a/src/test/java/com/aparapi/runtime/ProfileReportBackwardsCompatTest.java b/src/test/java/com/aparapi/runtime/ProfileReportBackwardsCompatTest.java new file mode 100644 index 0000000000000000000000000000000000000000..4c9e00a23e50267b95e313558b1dda29ed87b4e2 --- /dev/null +++ b/src/test/java/com/aparapi/runtime/ProfileReportBackwardsCompatTest.java @@ -0,0 +1,311 @@ +/** + * Copyright (c) 2016 - 2018 Syncleus, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.aparapi.runtime; + +import static org.junit.Assume.assumeTrue; + +import java.lang.ref.WeakReference; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import static org.junit.Assert.*; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import com.aparapi.Config; +import com.aparapi.Kernel; +import com.aparapi.ProfileReport; +import com.aparapi.Range; +import com.aparapi.device.Device; +import com.aparapi.device.JavaDevice; +import com.aparapi.device.OpenCLDevice; +import com.aparapi.internal.kernel.KernelManager; + +/** + * Provides integration tests to help assure backwards compatibility under single thread per kernel per device + * execution environments. + * + * @author CoreRasurae + * + */ +public class ProfileReportBackwardsCompatTest { + private static OpenCLDevice openCLDevice; + + private static Logger logger = Logger.getLogger(Config.getLoggerName()); + + @Rule + public TestName name = new TestName(); + + + private class CLKernelManager extends KernelManager { + @Override + protected List<Device.TYPE> getPreferredDeviceTypes() { + return Arrays.asList(Device.TYPE.ACC, Device.TYPE.GPU, Device.TYPE.CPU); + } + } + + private class JTPKernelManager extends KernelManager { + private JTPKernelManager() { + LinkedHashSet<Device> preferredDevices = new LinkedHashSet<Device>(1); + preferredDevices.add(JavaDevice.THREAD_POOL); + setDefaultPreferredDevices(preferredDevices); + } + @Override + protected List<Device.TYPE> getPreferredDeviceTypes() { + return Arrays.asList(Device.TYPE.JTP); + } + } + + public void setUpBefore() throws Exception { + KernelManager.setKernelManager(new CLKernelManager()); + Device device = KernelManager.instance().bestDevice(); + if (device == null || !(device instanceof OpenCLDevice)) { + logger.log(Level.WARNING, "!!!No OpenCLDevice available for running the integration test - test will be skipped"); + } + assumeTrue (device != null && device instanceof OpenCLDevice); + openCLDevice = (OpenCLDevice) device; + } + + + /** + * This integration test validates that previous Kernel methods for retrieving profiling data + * are still consistent in the new implementation and with the new profile reports. + * @throws Exception + */ + @Test + public void sequentialSingleThreadOpenCLTest() throws Exception { + setUpBefore(); + logger.log(Level.INFO, "Test " + name.getMethodName() + " - Executing on device: " + openCLDevice.getShortDescription() + " - " + openCLDevice.getName()); + assertTrue(sequentialSingleThreadTestHelper(openCLDevice, 128)); + } + + /** + * This integration test validates that previous Kernel methods for retrieving profiling data + * are still consistent in the new implementation and with the new profile reports. + */ + @Test + public void sequentialSingleThreadJTPTest() { + KernelManager.setKernelManager(new JTPKernelManager()); + Device device = KernelManager.instance().bestDevice(); + assertTrue(sequentialSingleThreadTestHelper(device, 16)); + } + + + public boolean sequentialSingleThreadTestHelper(Device device, int size) { + final int runs = 100; + final int inputArray[] = new int[size]; + double accumulatedExecutionTime = 0.0; + double lastExecutionTime = 0.0; + double lastConversionTime = 0.0; + final Basic1Kernel kernel = new Basic1Kernel(); + + int[] outputArray = null; + Range range = device.createRange(size, size); + long startOfExecution = System.currentTimeMillis(); + try { + for (int i = 0; i < runs; i++) { + outputArray = Arrays.copyOf(inputArray, inputArray.length); + kernel.setInputOuputArray(outputArray); + kernel.execute(range); + lastExecutionTime = kernel.getExecutionTime(); + accumulatedExecutionTime += lastExecutionTime; + lastConversionTime = kernel.getConversionTime(); + } + long runTime = System.currentTimeMillis() - startOfExecution; + WeakReference<ProfileReport> reportRef = kernel.getProfileReportLastThread(device); + ProfileReport report = reportRef.get(); + assertEquals("Number of profiling reports doesn't match the expected", runs, report.getReportId()); + assertEquals("Aparapi Accumulated execution time doesn't match", accumulatedExecutionTime, kernel.getAccumulatedExecutionTime(), 1e-10); + assertEquals("Aparapi last execution time doesn't match last report", lastExecutionTime, report.getExecutionTime(), 1e-10); + assertEquals("Aparapi last conversion time doesn't match last report", lastConversionTime, report.getConversionTime(), 1e-10); + assertEquals("Test estimated accumulated time doesn't match within 100ms window", runTime, accumulatedExecutionTime, 100); + assertTrue(validateBasic1Kernel(inputArray, outputArray)); + } finally { + kernel.dispose(); + } + + return true; + } + + private class TestData { + private int[] outputArray; + private double accumulatedExecutionTime = 0.0; + private double lastExecutionTime = 0.0; + private double lastConversionTime = 0.0; + private long startOfExecution = 0; + private long runTime = 0; + } + + /** + * This test executes two threads one for each kernel on an OpenCL device and checks that the traditional Aparapi profiling interfaces work. + * @throws Exception + */ + @Test + public void threadedSingleThreadPerKernelOpenCLTest() throws Exception { + setUpBefore(); + logger.log(Level.INFO, "Test " + name.getMethodName() + " - Executing on device: " + openCLDevice.getShortDescription() + " - " + openCLDevice.getName()); + assertTrue(threadedSingleThreadPerKernelTestHelper(openCLDevice, 128)); + } + + /** + * This test executes two threads one for each kernel on Java Thread Pool and checks that the traditional Aparapi profiling interfaces work. + */ + @Test + public void threadedSingleThreadPerKernelJTPTest() { + KernelManager.setKernelManager(new JTPKernelManager()); + Device device = KernelManager.instance().bestDevice(); + assertTrue(threadedSingleThreadPerKernelTestHelper(device, 16)); + } + + public boolean threadedSingleThreadPerKernelTestHelper(Device device, final int size) { + final int runs = 100; + final int inputArray[] = new int[size]; + + final Basic1Kernel kernel1 = new Basic1Kernel(); + final Basic1Kernel kernel2 = new Basic2Kernel(); + List<Basic1Kernel> kernels = new ArrayList<Basic1Kernel>(2); + kernels.add(kernel1); + kernels.add(kernel2); + + final TestData[] results = new TestData[2]; + results[0] = new TestData(); + results[1] = new TestData(); + + boolean terminatedOk = false; + try { + ExecutorService executorService = Executors.newFixedThreadPool(2); + try { + kernels.forEach(k -> executorService.submit(() -> { + results[k.getId() - 1].startOfExecution = System.currentTimeMillis(); + for (int i = 0; i < runs; i++) { + results[k.getId() - 1].outputArray = Arrays.copyOf(inputArray, inputArray.length); + k.setInputOuputArray(results[k.getId() - 1].outputArray); + k.execute(Range.create(device, size, size)); + results[k.getId() - 1].lastExecutionTime = k.getExecutionTime(); + results[k.getId() - 1].accumulatedExecutionTime += results[k.getId() - 1].lastExecutionTime; + results[k.getId() - 1].lastConversionTime = k.getConversionTime(); + } + results[k.getId() - 1].runTime = System.currentTimeMillis() - results[k.getId() - 1].startOfExecution; + })); + } finally { + executorService.shutdown(); + try { + terminatedOk = executorService.awaitTermination(5, TimeUnit.MINUTES); + } catch (InterruptedException ex) { + //For the purposes of the test this suffices + terminatedOk = false; + } + if (!terminatedOk) { + executorService.shutdownNow(); + } + } + + assertTrue(terminatedOk); + + //Validate kernel1 reports + WeakReference<ProfileReport> reportRef = kernel1.getProfileReportLastThread(device); + ProfileReport report = reportRef.get(); + assertEquals("Number of profiling reports doesn't match the expected", runs, report.getReportId()); + assertEquals("Aparapi Accumulated execution time doesn't match", results[0].accumulatedExecutionTime, kernel1.getAccumulatedExecutionTime(), 1e-10); + assertEquals("Aparapi last execution time doesn't match last report", results[0].lastExecutionTime, report.getExecutionTime(), 1e-10); + assertEquals("Aparapi last conversion time doesn't match last report", results[0].lastConversionTime, report.getConversionTime(), 1e-10); + assertEquals("Test estimated accumulated time doesn't match within 100ms window", results[0].runTime, results[0].accumulatedExecutionTime, 100); + assertTrue(validateBasic1Kernel(inputArray, results[0].outputArray)); + + //Validate kernel2 reports + reportRef = kernel2.getProfileReportLastThread(device); + report = reportRef.get(); + assertEquals("Number of profiling reports doesn't match the expected", runs, report.getReportId()); + assertEquals("Aparapi Accumulated execution time doesn't match", results[1].accumulatedExecutionTime, kernel2.getAccumulatedExecutionTime(), 1e-10); + assertEquals("Aparapi last execution time doesn't match last report", results[1].lastExecutionTime, report.getExecutionTime(), 1e-10); + assertEquals("Aparapi last conversion time doesn't match last report", results[1].lastConversionTime, report.getConversionTime(), 1e-10); + assertEquals("Test estimated accumulated time doesn't match within 100ms window", results[1].runTime, results[1].accumulatedExecutionTime, 100); + assertTrue(validateBasic2Kernel(inputArray, results[1].outputArray)); + } finally { + kernel1.dispose(); + kernel2.dispose(); + } + + return true; + } + + private boolean validateBasic1Kernel(final int[] inputArray, final int[] resultArray) { + int[] expecteds = Arrays.copyOf(inputArray, inputArray.length); + for (int threadId = 0; threadId < inputArray.length; threadId++) { + expecteds[threadId] += threadId; + } + + assertArrayEquals(expecteds, resultArray); + + return true; + } + + private boolean validateBasic2Kernel(final int[] inputArray, final int[] resultArray) { + int[] expecteds = Arrays.copyOf(inputArray, inputArray.length); + for (int threadId = 0; threadId < inputArray.length; threadId++) { + expecteds[threadId] += threadId+1; + } + + assertArrayEquals(expecteds, resultArray); + + return true; + } + + private class Basic1Kernel extends Kernel { + protected int[] workArray; + + @NoCL + public void setInputOuputArray(int[] array) { + workArray = array; + } + + @NoCL + public int getId() { + return 1; + } + + @Override + public void run() { + int id = getLocalId(); + + workArray[id]+=id; + } + } + + private class Basic2Kernel extends Basic1Kernel { + @Override + @NoCL + public int getId() { + return 2; + } + + @Override + public void run() { + int id = getLocalId(); + + workArray[id]+=id+1; + } + } + +} diff --git a/src/test/java/com/aparapi/runtime/ProfileReportNewAPITest.java b/src/test/java/com/aparapi/runtime/ProfileReportNewAPITest.java new file mode 100644 index 0000000000000000000000000000000000000000..0abfd26aa3d1f39f9ab22bf656749ea97ef37075 --- /dev/null +++ b/src/test/java/com/aparapi/runtime/ProfileReportNewAPITest.java @@ -0,0 +1,369 @@ +/** + * Copyright (c) 2016 - 2018 Syncleus, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.aparapi.runtime; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; + +import java.lang.ref.WeakReference; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import com.aparapi.Config; +import com.aparapi.IProfileReportObserver; +import com.aparapi.Kernel; +import com.aparapi.ProfileReport; +import com.aparapi.Range; +import com.aparapi.device.Device; +import com.aparapi.device.JavaDevice; +import com.aparapi.device.OpenCLDevice; +import com.aparapi.internal.kernel.KernelManager; + +/** + * Provides integration tests to help in assuring that new APIs for ProfileReports are working, + * in single threaded and multi-threaded environments. + * + * @author CoreRasurae + */ +public class ProfileReportNewAPITest { + + private static OpenCLDevice openCLDevice; + + private static Logger logger = Logger.getLogger(Config.getLoggerName()); + + @Rule + public TestName name = new TestName(); + + + private class CLKernelManager extends KernelManager { + @Override + protected List<Device.TYPE> getPreferredDeviceTypes() { + return Arrays.asList(Device.TYPE.ACC, Device.TYPE.GPU, Device.TYPE.CPU); + } + } + + private class JTPKernelManager extends KernelManager { + private JTPKernelManager() { + LinkedHashSet<Device> preferredDevices = new LinkedHashSet<Device>(1); + preferredDevices.add(JavaDevice.THREAD_POOL); + setDefaultPreferredDevices(preferredDevices); + } + @Override + protected List<Device.TYPE> getPreferredDeviceTypes() { + return Arrays.asList(Device.TYPE.JTP); + } + } + + public void setUpBefore() throws Exception { + KernelManager.setKernelManager(new CLKernelManager()); + Device device = KernelManager.instance().bestDevice(); + if (device == null || !(device instanceof OpenCLDevice)) { + logger.log(Level.WARNING, "!!!No OpenCLDevice available for running the integration test - test will be skipped"); + } + assumeTrue (device != null && device instanceof OpenCLDevice); + openCLDevice = (OpenCLDevice) device; + } + + /** + * Tests the ProfileReport observer interface in a single threaded, single kernel environment running on + * an OpenCL device. + * @throws Exception + */ + @Test + public void singleThreadedSingleKernelObserverOpenCLTest() throws Exception { + setUpBefore(); + logger.log(Level.INFO, "Test " + name.getMethodName() + " - Executing on device: " + openCLDevice.getShortDescription() + " - " + openCLDevice.getName()); + assertTrue(singleThreadedSingleKernelReportObserverTestHelper(openCLDevice, 128)); + } + + /** + * Tests the ProfileReport observer interface in a single threaded, single kernel environment running on + * Java Thread Pool. + */ + @Test + public void singleThreadedSingleKernelObserverJTPTest() { + KernelManager.setKernelManager(new JTPKernelManager()); + Device device = KernelManager.instance().bestDevice(); + assertTrue(singleThreadedSingleKernelReportObserverTestHelper(device, 16)); + } + + private class ThreadTestState { + private double accumulatedElapsedTime = 0.0; + private long receivedReportsCount = 0; + } + + private class ReportObserver implements IProfileReportObserver { + private final ConcurrentSkipListSet<Long> expectedThreadsIds = new ConcurrentSkipListSet<>(); + private final ConcurrentSkipListMap<Long, ThreadTestState> observedThreadsIds = new ConcurrentSkipListMap<>(); + private final Device device; + private final int threads; + private final int runs; + private final boolean[] receivedReportIds; + + private ReportObserver(Device _device, int _threads, int _runs) { + device = _device; + threads = _threads; + runs = _runs; + + receivedReportIds = new boolean[threads * runs]; + } + + private void addAcceptedThreadId(long threadId) { + expectedThreadsIds.add(threadId); + } + + private ConcurrentSkipListMap<Long, ThreadTestState> getObservedThreadsIds() { + return observedThreadsIds; + } + + @Override + public void receiveReport(Class<? extends Kernel> kernelClass, Device _device, WeakReference<ProfileReport> profileInfoRef) { + ProfileReport profileInfo = profileInfoRef.get(); + assertEquals("Kernel class does not match", Basic1Kernel.class, kernelClass); + assertEquals("Device does not match", device, _device); + boolean isThreadAccepted = expectedThreadsIds.contains(profileInfo.getThreadId()); + assertTrue("Thread generating the report (" + profileInfo.getThreadId() + + ") is not among the accepted ones: " + expectedThreadsIds.toString(), isThreadAccepted); + Long threadId = profileInfo.getThreadId(); + ThreadTestState state = observedThreadsIds.computeIfAbsent(threadId, k -> new ThreadTestState()); + state.accumulatedElapsedTime += profileInfo.getExecutionTime(); + state.receivedReportsCount++; + receivedReportIds[(int)profileInfo.getReportId() - 1] = true; + } + } + + public boolean singleThreadedSingleKernelReportObserverTestHelper(Device device, int size) { + final int runs = 100; + final int inputArray[] = new int[size]; + final Basic1Kernel kernel = new Basic1Kernel(); + + int[] outputArray = null; + Range range = device.createRange(size, size); + + ReportObserver observer = new ReportObserver(device, 1, runs); + observer.addAcceptedThreadId(Thread.currentThread().getId()); + kernel.registerProfileReportObserver(observer); + + for (int i = 0; i < runs; i++) { + assertFalse("Report with id " + i + " shouldn't have been received yet", observer.receivedReportIds[i]); + } + + long startOfExecution = System.currentTimeMillis(); + try { + for (int i = 0; i < runs; i++) { + outputArray = Arrays.copyOf(inputArray, inputArray.length); + kernel.setInputOuputArray(outputArray); + kernel.execute(range); + } + long runTime = System.currentTimeMillis() - startOfExecution; + ConcurrentSkipListMap<Long, ThreadTestState> results = observer.getObservedThreadsIds(); + ThreadTestState state = results.get(Thread.currentThread().getId()); + assertNotNull("Reports should have been received for thread", state); + + assertEquals("Number of profiling reports doesn't match the expected", runs, state.receivedReportsCount); + assertEquals("Aparapi Accumulated execution time doesn't match", kernel.getAccumulatedExecutionTimeAllThreads(device), state.accumulatedElapsedTime, 1e-10); + assertEquals("Test estimated accumulated time doesn't match within 200ms window", runTime, kernel.getAccumulatedExecutionTimeAllThreads(device), 200); + for (int i = 0; i < runs; i++) { + assertTrue("Report with id " + i + " wasn't received", observer.receivedReportIds[i]); + } + assertTrue(validateBasic1Kernel(inputArray, outputArray)); + } finally { + kernel.dispose(); + } + + return true; + } + + /** + * Tests the ProfileReport observer interface in a multi threaded, single kernel environment running on + * an OpenCL device. + */ + @Test + public void multiThreadedSingleKernelObserverOpenCLTest() throws Exception { + setUpBefore(); + logger.log(Level.INFO, "Test " + name.getMethodName() + " - Executing on device: " + openCLDevice.getShortDescription() + " - " + openCLDevice.getName()); + assertTrue(multiThreadedSingleKernelReportObserverTestHelper(openCLDevice, 128)); + } + + /** + * Tests the ProfileReport observer interface in a multi threaded, single kernel environment running on + * Java Thread Pool. + */ + @Test + public void multiThreadedSingleKernelObserverJTPTest() throws Exception { + KernelManager.setKernelManager(new JTPKernelManager()); + Device device = KernelManager.instance().bestDevice(); + assertTrue(multiThreadedSingleKernelReportObserverTestHelper(device, 16)); + } + + private class ThreadResults { + private long runTime; + private long threadId; + private int kernelCalls; + private double accumulatedExecutionTime; + private int[] outputArray; + } + + @SuppressWarnings("unchecked") + public boolean multiThreadedSingleKernelReportObserverTestRunner(final ExecutorService executorService, + final List<Basic1Kernel> kernels, final ThreadResults[] results, int[] inputArray, int runs, int javaThreads, + final Device device, final ReportObserver observer, int size) throws InterruptedException, ExecutionException { + final AtomicInteger atomicResultId = new AtomicInteger(0); + boolean terminatedOk = false; + try { + List<Future<Runnable>> futures = new ArrayList<>(javaThreads); + for (Basic1Kernel k : kernels) { + futures.add((Future<Runnable>)executorService.submit(new Runnable() { + @Override + public void run() { + int id = atomicResultId.getAndIncrement(); + results[id].threadId = Thread.currentThread().getId(); + observer.addAcceptedThreadId(results[id].threadId); + long startOfExecution = System.currentTimeMillis(); + results[id].kernelCalls = 0; + for (int i = 0; i < runs; i++) { + results[id].outputArray = Arrays.copyOf(inputArray, inputArray.length); + k.setInputOuputArray(results[id].outputArray); + k.execute(Range.create(device, size, size)); + results[id].kernelCalls++; + } + results[id].runTime = System.currentTimeMillis() - startOfExecution; + results[id].accumulatedExecutionTime = k.getAccumulatedExecutionTimeCurrentThread(device); + } + })); + } + for (Future<Runnable> future : futures) { + future.get(); + } + } finally { + executorService.shutdown(); + try { + terminatedOk = executorService.awaitTermination(1, TimeUnit.MINUTES); + } catch (InterruptedException ex) { + //For the purposes of the test this suffices + terminatedOk = false; + } + if (!terminatedOk) { + executorService.shutdownNow(); + } + } + + return terminatedOk; + } + + public boolean multiThreadedSingleKernelReportObserverTestHelper(Device device, int size) throws InterruptedException, ExecutionException { + final int runs = 100; + final int javaThreads = 10; + final int inputArray[] = new int[size]; + ExecutorService executorService = Executors.newFixedThreadPool(javaThreads); + + final ReportObserver observer = new ReportObserver(device, javaThreads, runs); + + for (int i = 0; i < runs; i++) { + assertFalse("Report with id " + i + " shouldn't have been received yet", observer.receivedReportIds[i]); + } + + final List<Basic1Kernel> kernels = new ArrayList<Basic1Kernel>(javaThreads); + for (int i = 0; i < javaThreads; i++) { + final Basic1Kernel kernel = new Basic1Kernel(); + kernel.registerProfileReportObserver(observer); + kernels.add(kernel); + } + + final ThreadResults[] results = new ThreadResults[javaThreads]; + for (int i = 0; i < results.length; i++) { + results[i] = new ThreadResults(); + } + + + boolean terminatedOk = multiThreadedSingleKernelReportObserverTestRunner(executorService, kernels, results, + inputArray, runs, javaThreads, device, observer, size); + + assertTrue("Threads did not terminate correctly", terminatedOk); + + double allThreadsAccumulatedTime = 0; + ConcurrentSkipListMap<Long, ThreadTestState> states = observer.getObservedThreadsIds(); + assertEquals("Number of Java threads sending profile reports should match the number of JavaThreads", javaThreads, states.values().size()); + for (int i = 0; i < javaThreads; i++) { + ThreadTestState state = states.get(results[i].threadId); + assertNotNull("Report should have been received for thread with index " + i, state); + assertEquals("Number of total iteration should match number of runs for thread with index " + i, runs, results[i].kernelCalls); + assertEquals("Number of received reports should match total number of calls for thread with index " + i, runs, state.receivedReportsCount); + assertEquals("Overall elapsed time received in reports doesn't match KernelDeviceProfile.Accumulator for threa with index " + i, + results[i].accumulatedExecutionTime, state.accumulatedElapsedTime, 1e-10); + allThreadsAccumulatedTime += state.accumulatedElapsedTime; + assertTrue("Thread index " + i + " kernel computation doesn't match the expected", validateBasic1Kernel(inputArray, results[i].outputArray)); + assertEquals("Runtime is not within 600ms of the kernel estimated", results[i].runTime, state.accumulatedElapsedTime, 600); + } + + assertEquals("Overall kernel execution time doesn't match", + kernels.get(0).getAccumulatedExecutionTimeAllThreads(device), allThreadsAccumulatedTime, 1e10); + + return true; + } + + private boolean validateBasic1Kernel(final int[] inputArray, final int[] resultArray) { + int[] expecteds = Arrays.copyOf(inputArray, inputArray.length); + for (int threadId = 0; threadId < inputArray.length; threadId++) { + expecteds[threadId] += threadId; + } + + assertArrayEquals(expecteds, resultArray); + + return true; + } + + private class Basic1Kernel extends Kernel { + protected int[] workArray; + + @NoCL + public void setInputOuputArray(int[] array) { + workArray = array; + } + + @NoCL + public int getId() { + return 1; + } + + @Override + public void run() { + int id = getLocalId(); + + workArray[id]+=id; + } + } +} diff --git a/src/test/java/com/aparapi/runtime/ProfileReportUnitTest.java b/src/test/java/com/aparapi/runtime/ProfileReportUnitTest.java new file mode 100644 index 0000000000000000000000000000000000000000..8e9a297ff38c485b6d1bed4b3aa3f10b898826f6 --- /dev/null +++ b/src/test/java/com/aparapi/runtime/ProfileReportUnitTest.java @@ -0,0 +1,167 @@ +/** + * Copyright (c) 2016 - 2018 Syncleus, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.aparapi.runtime; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.lang.ref.WeakReference; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import com.aparapi.IProfileReportObserver; +import com.aparapi.Kernel; +import com.aparapi.ProfileReport; +import com.aparapi.device.Device; +import com.aparapi.device.JavaDevice; +import com.aparapi.internal.kernel.KernelDeviceProfile; +import com.aparapi.internal.kernel.KernelProfile; +import com.aparapi.internal.kernel.ProfilingEvent; + +/** + * This class provides unit tests to help in validation of thread-safe ProfileReports. + * + * @author CoreRasurae + */ +public class ProfileReportUnitTest { + + private class SimpleKernel extends Kernel { + + @Override + public void run() { + //Empty method intended + } + } + + /** + * This test validates that all threads can start a profiling process after another thread + * that has already started profiling. + * @throws InterruptedException + * @throws Exception + */ + @Test + public void testAllThreadCanStartPass() throws IllegalStateException, InterruptedException { + final int javaThreads = ProfilingEvent.values().length; + final KernelProfile kernelProfile = new KernelProfile(SimpleKernel.class); + final KernelDeviceProfile kernelDeviceProfile = new KernelDeviceProfile(kernelProfile, SimpleKernel.class, JavaDevice.THREAD_POOL); + final AtomicInteger receivedReports = new AtomicInteger(0); + final ConcurrentSkipListSet<Long> onEventAccepted = new ConcurrentSkipListSet<Long>(); + final AtomicInteger index = new AtomicInteger(0); + final long[] threadIds = new long[javaThreads + 1]; + + kernelProfile.setReportObserver(new IProfileReportObserver() { + @Override + public void receiveReport(Class<? extends Kernel> kernelClass, Device device, WeakReference<ProfileReport> profileInfo) { + receivedReports.incrementAndGet(); + onEventAccepted.add(profileInfo.get().getThreadId()); + } + }); + + //Ensure that the first thread as started profiling, before testing the others + kernelDeviceProfile.onEvent(ProfilingEvent.START); + + List<ProfilingEvent> events = Arrays.asList(ProfilingEvent.values()); + + ExecutorService executorService = Executors.newFixedThreadPool(javaThreads); + try { + events.forEach(evt -> { + final int idx = index.getAndIncrement(); + executorService.submit(() -> { + threadIds[idx] = Thread.currentThread().getId(); + kernelDeviceProfile.onEvent(ProfilingEvent.START); + kernelDeviceProfile.onEvent(ProfilingEvent.EXECUTED); + }); + }); + } finally { + executorService.shutdown(); + if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) { + executorService.shutdownNow(); + throw new IllegalStateException("ExecutorService terminated abnormaly"); + } + } + + threadIds[index.get()] = Thread.currentThread().getId(); + for (int i = 0; i < javaThreads; i++) { + assertTrue("Report wasn't received for thread with index " + i, onEventAccepted.contains(threadIds[i])); + } + assertFalse("Report was received for main thread", onEventAccepted.contains(threadIds[javaThreads])); + assertEquals("Reports from all threads should have been received", javaThreads, receivedReports.get()); + + //Only after this event should the main thread have received a report + kernelDeviceProfile.onEvent(ProfilingEvent.EXECUTED); + + assertTrue("Report wasn't received for main thread", onEventAccepted.contains(threadIds[javaThreads])); + assertEquals("Reports from all threads should have been received", javaThreads + 1, receivedReports.get()); + } + + @Test + public void testGetProfilingEventsNames() { + String[] stages = ProfilingEvent.getStagesNames(); + int i = 0; + for (String stage : stages) { + assertNotNull("Stage is null at index " + i, stage); + assertFalse("Stage name is empty at index " + i, stage.isEmpty()); + ProfilingEvent event = ProfilingEvent.valueOf(stage); + assertTrue("Stage name does not translate to an event", event != null); + assertEquals("Stage name does match correct order", i, event.ordinal()); + i++; + } + } + + @Test + public void testProfileReportClone() { + final int reportId = 101; + final int threadId = 192; + final long[] values = new long[ProfilingEvent.values().length]; + for (int i = 0; i < values.length; i++) { + values[i] = 900 + i; + } + + ProfileReport report = new ProfileReport(threadId, SimpleKernel.class, JavaDevice.THREAD_POOL); + report.setProfileReport(reportId, values); + + ProfileReport clonedReport = report.clone(); + assertNotEquals("Object references shouldn't be the same", report, clonedReport); + assertEquals("Report Id doesn't match", reportId, clonedReport.getReportId()); + assertEquals("Class doesn't match", SimpleKernel.class, clonedReport.getKernelClass()); + assertEquals("Device doesn't match", JavaDevice.THREAD_POOL, clonedReport.getDevice()); + + for (int i = 0; i < values.length; i++) { + assertEquals("Values don't match for index " + i, report.getElapsedTime(i), clonedReport.getElapsedTime(i), 1e-10); + } + + long[] valuesB = new long[ProfilingEvent.values().length]; + for (int i = 0; i < valuesB.length; i++) { + valuesB[i] = 100 + i*100; + } + report.setProfileReport(reportId + 1, valuesB); + + for (int i = 1; i < values.length; i++) { + assertNotEquals("Values match after new assingment for index " + i, report.getElapsedTime(i), clonedReport.getElapsedTime(i), 1e-10); + } + + } +}