From a2abc30b9111d496c974a3d612f9a42a0f6347ef Mon Sep 17 00:00:00 2001 From: CoreRasurae <luis.p.mendes@gmail.com> Date: Sat, 21 Apr 2018 00:28:43 +0100 Subject: [PATCH] Fix and Update: Fix issue #62 and provide new API for kernel profiling under multithreading (refs #62) --- CHANGELOG.md | 2 + CONTRIBUTORS.md | 1 + pom.xml | 3 +- .../com/aparapi/IProfileReportObserver.java | 42 +++ src/main/java/com/aparapi/Kernel.java | 192 +++++++++- src/main/java/com/aparapi/ProfileReport.java | 142 ++++++++ .../internal/kernel/KernelDeviceProfile.java | 83 ++++- .../internal/kernel/KernelProfile.java | 28 +- .../internal/kernel/ProfilingEvent.java | 23 +- .../ProfileReportBackwardsCompatTest.java | 285 +++++++++++++++ .../runtime/ProfileReportNewAPITest.java | 339 ++++++++++++++++++ .../runtime/ProfileReportUnitTest.java | 263 ++++++++++++++ 12 files changed, 1364 insertions(+), 39 deletions(-) create mode 100644 src/main/java/com/aparapi/IProfileReportObserver.java create mode 100644 src/main/java/com/aparapi/ProfileReport.java create mode 100644 src/test/java/com/aparapi/runtime/ProfileReportBackwardsCompatTest.java create mode 100644 src/test/java/com/aparapi/runtime/ProfileReportNewAPITest.java create mode 100644 src/test/java/com/aparapi/runtime/ProfileReportUnitTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 25a62ac9..ef30c21c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## 1.7.0 * Fully support OpenCL 1.2 barrier() - localBarrier(), globalBarrier() and localGlobalBarrier() +* Fix issue #62 - SEVERE log messages on Aparapi kernel profiling under multithreading +* Provide new interfaces for thread safe kernel profiling (mutiple threads calling same kernel class on same device) ## 1.6.0 diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index 536de89f..6ede35f1 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -45,3 +45,4 @@ Below are some of the specific details of various contributions. * Luis Mendes submited PR to support passing functions arguments containing Local arrays - issue #79 * Luis Mendes submited PR for issue #81 - Full OpenCL 1.2 atomics support with AtomicInteger * Luis Mendes submited PR for issue #84 - Fully support OpenCL 1.2 barrier() - localBarrier(), globalBarrier() and localGlobalBarrier() +* Luis Mendes submited PR for issue #62 and implemented new thread-sage API for Kernel profiling \ No newline at end of file diff --git a/pom.xml b/pom.xml index 545416fa..a80e286d 100644 --- a/pom.xml +++ b/pom.xml @@ -1,4 +1,3 @@ -<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> @@ -22,6 +21,8 @@ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> + <maven.compiler.testSource>1.8</maven.compiler.testSource> + <maven.compiler.testTarget>1.8</maven.compiler.testTarget> </properties> <name>Aparapi</name> diff --git a/src/main/java/com/aparapi/IProfileReportObserver.java b/src/main/java/com/aparapi/IProfileReportObserver.java new file mode 100644 index 00000000..5c328f3a --- /dev/null +++ b/src/main/java/com/aparapi/IProfileReportObserver.java @@ -0,0 +1,42 @@ +/** + * Copyright (c) 2016 - 2017 Syncleus, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.aparapi; + +import com.aparapi.device.Device; + +/** + * Defines interface for listener/observer of Kernel profile reports + * + * @author lpnm + */ +public interface IProfileReportObserver { + + /** + * The listener method will be invoked each time a profile report becomes available for each Aparapi Kernel which has + * a registered observer.<br/> + * <b>Note1: </b>A report will be generated by a thread executing a kernel, but if multiple threads execute the same kernel, + * on the same device, the report rate is limited to a single thread at a time per kernel per device.<br/> + * <br/> + * <b>Note2: </b>When an observer is registered there is no need to acknowledge the reception of a profile report, a new + * one will be automatically generated when another thread runs the same kernel on the same device. + * <br/> + * @param kernelClass the class of the kernel to which the profile report pertains + * @param device the device on which the kernel ran, producing the profile report + * @param profileInfo the profile report for the given Aparapi kernel and device pair + */ + public void receiveReport(final Class<? extends Kernel> kernelClass, final Device device, final ProfileReport profileInfo); + +} diff --git a/src/main/java/com/aparapi/Kernel.java b/src/main/java/com/aparapi/Kernel.java index 843b9d50..7b08bc1b 100644 --- a/src/main/java/com/aparapi/Kernel.java +++ b/src/main/java/com/aparapi/Kernel.java @@ -53,7 +53,6 @@ under those regulations, please refer to the U.S. Bureau of Industry and Securit package com.aparapi; import com.aparapi.annotation.Experimental; -import com.aparapi.exception.DeprecatedException; import com.aparapi.internal.model.CacheEnabler; import com.aparapi.internal.model.ClassModel.ConstantPool.MethodReferenceEntry; import com.aparapi.internal.model.ClassModel.ConstantPool.NameAndTypeEntry; @@ -87,6 +86,7 @@ import com.aparapi.device.Device; import com.aparapi.device.JavaDevice; import com.aparapi.device.OpenCLDevice; import com.aparapi.internal.kernel.KernelArg; +import com.aparapi.internal.kernel.KernelDeviceProfile; import com.aparapi.internal.kernel.KernelManager; import com.aparapi.internal.kernel.KernelProfile; import com.aparapi.internal.kernel.KernelRunner; @@ -2502,12 +2502,108 @@ public abstract class Kernel implements Cloneable { return kernelRunner; } + /** + * Registers a new profile report observer to receive profile reports as they're produced. + * This is the method recommended when the client application desires to receive as much profile + * reports as possible during the application execution<br/> + * <b>Note1: </b>A report will be generated by a thread executing a kernel, but if multiple threads execute the same kernel, + * on the same device, the report rate is limited to a single thread at a time per kernel per device. + * <br/> + * <b>Note2: </b>To cancel the report subscription just set observer to <code>null</code> value. + * <br/> + * <b>Note3: </b>There is no need to call {@link #acknowledgeProfileReport(Device)} + * <br/> + * @param observer the observer instance that will receive the profile reports + */ + public void registerProfileReportObserver(IProfileReportObserver observer) { + KernelProfile profile = KernelManager.instance().getProfile(getClass()); + synchronized (profile) { + profile.setReportObserver(observer); + } + } + + /** + * Acknowledges the current report profile for this kernel on the given device, thus enabling + * for another profile report to be generated for the current kernel and specified device.<br/> + * <b>Note1: </b>It is not required to acknowledge reports when only a single and same thread is used + * per kernel per device throughout all the application execution.<br/> + * <b>Note2: </b>This method should be called by the last thread that reads the report data, + * after it has finished using the report.<br/> + * @param device the device for which the profile report is to be acknowledged. + * + * @see #getAccumulatedExecutionTime(Device) + * @see #getProfileReport(Device) + */ + public void acknowledgeProfileReport(Device device) { + KernelProfile profile = KernelManager.instance().getProfile(getClass()); + KernelDeviceProfile deviceProfile = null; + boolean hasObserver = false; + synchronized (profile) { + if (profile.getReportObserver() != null) { + hasObserver = true; + } + deviceProfile = profile.getDeviceProfile(device); + } + + if (deviceProfile != null && !hasObserver) { + deviceProfile.acknowledgeLastReport(); + } + } + + /** + * Retrieves a profile report for this kernel for a given device.<br/> + * A report will only be available if at least one thread executed the kernel on the + * specified device after the last call to {@link #acknowledgeProfileReport(Device)}. + * <br/> + * <b>Note: </b> This is the preferred method when using more than a single thread to execute + * the same kernel on the same device concurrently. In fact this method can be used in all + * usage circumstances single threaded or not. + * <br/> + * + * @param device the device for which the profile report is to be retrieved. + * @return <ul><li>the profile report instance for the kernel and selected device, if one is available</li> + * <li>null, if no profile report is available</li></ul> + * + * @see #getAccumulatedExecutionTime(Device) + * @see #acknowledgeProfileReport(Device) + */ + public ProfileReport getProfileReport(Device device) { + KernelProfile profile = KernelManager.instance().getProfile(getClass()); + KernelDeviceProfile deviceProfile = null; + boolean hasObserver = false; + synchronized (profile) { + if (profile.getReportObserver() != null) { + hasObserver = true; + } + deviceProfile = profile.getDeviceProfile(device); + } + + if (hasObserver) { + return null; + } + + return deviceProfile.getLastReport(); + } + + /** * Determine the execution time of the previous Kernel.execute(range) call. + * <br/> + * <b>Note1: </b>This is kept for backwards compatibility only, usage of either + * {@link #getProfileReport(Device)} or {@link #registerProfileReportObserver(IProfileReportObserver)} + * is encouraged instead.<br/> + * <b>Note2: </b>Calling this method is not recommended when using more than a single thread to execute + * the same kernel on the same device concurrently - use instead {@link #getProfileReport(Device)}.<br/> + * <br/> + * Note that for the first call this will include the conversion time.<br/> + * <br/> + * @return <ul><li>The time spent executing the kernel (ms)</li> + * <li>NaN, if no profile report is available</li></ul> * - * Note that for the first call this will include the conversion time. - * - * @return The time spent executing the kernel (ms) + * @see #getAccumulatedExecutionTime(Device)); + * @see #acknowledgeProfileReport(); + * @see #getProfileReport(Device) + * @see #registerProfileReportObserver(IProfileReportObserver) * * @see #getConversionTime(); * @see #getAccumulatedExecutionTime(); @@ -2521,37 +2617,97 @@ public abstract class Kernel implements Cloneable { } /** - * Determine the total execution time of all previous Kernel.execute(range) calls. - * - * Note that this will include the initial conversion time. + * Determine the time taken to convert bytecode to OpenCL for first Kernel.execute(range) call. + * <br/> + * <b>Note1: </b>This is kept for backwards compatibility only, usage of either + * {@link #getProfileReport(Device)} or {@link #registerProfileReportObserver(IProfileReportObserver)} + * is encouraged instead.<br/> + * <b>Note2: </b>Calling this method is not recommended when using more than a single thread to execute + * the same kernel on the same device concurrently - use instead {@link #getProfileReport(Device)}.<br/> + * <br/> + * Note that for the first call this will include the conversion time.<br/> + * <br/> + * @return <ul><li>The time spent preparing the kernel for execution using GPU</li> + * <li>NaN, if no profile report is available</li></ul> * - * @return The total time spent executing the kernel (ms) * + * @see #getAccumulatedExecutionTime(Device)); + * @see #acknowledgeProfileReport(); + * @see #getProfileReport(Device) + * @see #registerProfileReportObserver(IProfileReportObserver) + * + * @see #getAccumulatedExecutionTime(); * @see #getExecutionTime(); - * @see #getConversionTime(); + */ + public double getConversionTime() { + KernelProfile profile = KernelManager.instance().getProfile(getClass()); + synchronized (profile) { + return profile.getLastConversionTime(); + } + } + + /** + * Determine the total execution time of all produced profile reports for the current kernel and + * specified device. + * <br/> + * <b>Note1: </b>This is the recommended method to retrieve the accumulated execution time, even + * when doing multithreading for the same kernel and device. + * <br/> + * <b>Note2: </b>For the accumulated time to include as many profile reports as possible in a + * multithreading execution environment it is recommended to use an observer with + * {@link #registerProfileReportObserver(IProfileReportObserver)}, otherwise, the + * {@link #acknowledgeProfileReport(Device)} method should be called after each kernel execution.<br/> + * <br/> + * Note that this will include the initial conversion time. * + * @return <ul><li>The total time spent executing the kernel (ms)</li> + * <li>NaN, if no profiling information is available</li></ul> + * + * @see #acknowledgeProfileReport(); + * @see #getProfileReport(Device) + * @see #registerProfileReportObserver(IProfileReportObserver) */ - public double getAccumulatedExecutionTime() { + public double getAccumulatedExecutionTime(Device device) { KernelProfile profile = KernelManager.instance().getProfile(getClass()); synchronized (profile) { - return profile.getAccumulatedTotalTime(); + KernelDeviceProfile deviceProfile = profile.getDeviceProfile(device); + if (deviceProfile == null) { + return Double.NaN; + } + return deviceProfile.getCumulativeElapsedTimeAll() / KernelProfile.MILLION; } } - + /** - * Determine the time taken to convert bytecode to OpenCL for first Kernel.execute(range) call. - * @return The time spent preparing the kernel for execution using GPU + * Determine the total (only with a single thread per kernel per device) execution + * time of all previous Kernel.execute(range) calls. + * <br/> + * <b>Note1: </b>This is kept for backwards compatibility only, usage of + * {@link #getAccumulatedExecutionTime(Device)} is encouraged instead.<br/> + * <b>Note2: </b>Calling this method is not recommended when using more than a single thread to execute + * the same kernel on the same device concurrently.<br/> + * <br/> + * Note that this will include the initial conversion time. + * + * @return <ul><li>The total time spent executing the kernel (ms)</li> + * <li>NaN, if no profiling information is available</li></ul> + * + * @see #getAccumulatedExecutionTime(Device)); + * @see #acknowledgeProfileReport(); + * @see #getProfileReport(Device) + * @see #registerProfileReportObserver(IProfileReportObserver) * * @see #getExecutionTime(); - * @see #getAccumulatedExecutionTime(); + * @see #getConversionTime(); + * */ - public double getConversionTime() { + public double getAccumulatedExecutionTime() { KernelProfile profile = KernelManager.instance().getProfile(getClass()); synchronized (profile) { - return profile.getLastConversionTime(); + return profile.getAccumulatedTotalTime(); } } - + /** * Start execution of <code>_range</code> kernels. * <p> diff --git a/src/main/java/com/aparapi/ProfileReport.java b/src/main/java/com/aparapi/ProfileReport.java new file mode 100644 index 00000000..b409c3e8 --- /dev/null +++ b/src/main/java/com/aparapi/ProfileReport.java @@ -0,0 +1,142 @@ +/** + * Copyright (c) 2016 - 2017 Syncleus, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.aparapi; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import com.aparapi.device.Device; +import com.aparapi.internal.kernel.ProfilingEvent; + +public final class ProfileReport { + private static final double MILLION = 1000000d; + private final long id; + private final Class<? extends Kernel> kernelClass; + private final long threadId; + private final Device device; + private final long currentTimes[]; + private final String[] stagesNames; + + /** + * Creates a new ProfileReport bean with profiling information for a given kernel that ran on the given device, + * for the given thread. + * @param reportId the unique identifier for this report (the identifier is unique within the <kernel,device> tuple) + * @param clazz the kernel class that this report pertains to + * @param _device the device where the kernel ran + * @param _threadId the id of the thread that called the kernel.execute(...) + * @param _currentTimes the profiling data + */ + public ProfileReport(final long reportId, final Class<? extends Kernel> clazz, final Device _device, final long _threadId, + final long[] _currentTimes) { + id = reportId; + kernelClass = clazz; + threadId = _threadId; + device = _device; + currentTimes = Arrays.copyOf(_currentTimes, _currentTimes.length); + stagesNames = ProfilingEvent.getStagesNames(); + } + + /** + * Retrieves the current report unique identifier.<br/> + * <b>Note: </b>The identifier is monotonically incremented at each new report for the current + * <kernel, device> tuple. + * @return the report id + */ + public long getReportId() { + return id; + } + + /** + * Retrieves the thread id of the thread that executed the kernel, producing this profile report. + * @return the thread id + */ + public long getThreadId() { + return threadId; + } + + /** + * Retrieves the class of the kernel to which this profile report pertains to + * @return the Aparapi kernel class + */ + public Class<? extends Kernel> getKernelClass() { + return kernelClass; + } + + /** + * Retrieves the Aparapi device where the kernel was executed, producing this profile report. + * @return the Aparapi device + */ + public Device getDevice() { + return device; + } + + /** + * Get the names of the stages for which data was collected. + * @return the list with the stages names + */ + public List<String> getStageNames() { + return Collections.unmodifiableList(Arrays.asList(stagesNames)); + } + + /** + * The number of stages available with report data. + * @return the number of stages + */ + public int getNumberOfStages() { + return stagesNames.length; + } + + /** + * Get the name of a given stage + * @param stage the index of the stage + * @return the stage name + */ + public String getStageName(int stage) { + return stagesNames[stage]; + } + + /** Elapsed time for a single event only, i.e. since the previous stage rather than from the start. */ + public double getElapsedTime(int stage) { + if (stage == ProfilingEvent.START.ordinal()) { + return 0; + } + return (currentTimes[stage] - currentTimes[stage - 1]) / MILLION; + } + + /** Elapsed time for all events {@code from} through {@code to}.*/ + public double getElapsedTime(int from, int to) { + return (currentTimes[to] - currentTimes[from]) / MILLION; + } + + /** + * Determine the execution time of the Kernel.execute(range) call from this report. + * + * @return The time spent executing the kernel (ms) + */ + public double getExecutionTime() { + return getElapsedTime(ProfilingEvent.START.ordinal(), ProfilingEvent.EXECUTED.ordinal()); + } + + /** + * Determine the time taken to convert bytecode to OpenCL for first Kernel.execute(range) call. + * + * @return The time spent preparing the kernel for execution using GPU (ms) + */ + public double getConversionTime() { + return getElapsedTime(ProfilingEvent.START.ordinal(), ProfilingEvent.PREPARE_EXECUTE.ordinal()); + } +} diff --git a/src/main/java/com/aparapi/internal/kernel/KernelDeviceProfile.java b/src/main/java/com/aparapi/internal/kernel/KernelDeviceProfile.java index c4ca0fb8..49f21135 100644 --- a/src/main/java/com/aparapi/internal/kernel/KernelDeviceProfile.java +++ b/src/main/java/com/aparapi/internal/kernel/KernelDeviceProfile.java @@ -20,6 +20,8 @@ import com.aparapi.device.*; import java.text.*; import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.*; /** @@ -29,17 +31,22 @@ public class KernelDeviceProfile { private static Logger logger = Logger.getLogger(Config.getLoggerName()); private static final double MILLION = 1000 * 1000; + private static final long NO_THREAD = -1; private static final int TABLE_COLUMN_HEADER_WIDTH = 21; private static final int TABLE_COLUMN_COUNT_WIDTH = 8; private static final int TABLE_COLUMN_WIDTH; private static String tableHeader = null; + private final Object lock = new Object(); + private final KernelProfile parentKernelProfile; private final Class<? extends Kernel> kernel; private final Device device; + private long threadId = NO_THREAD; private long[] currentTimes = new long[ProfilingEvent.values().length]; private long[] accumulatedTimes = new long[ProfilingEvent.values().length]; private ProfilingEvent lastEvent = null; private final DecimalFormat format; private long invocationCount = 0; + private volatile ProfileReport lastReport; static { assert ProfilingEvent.START.ordinal() == 0 : "ProfilingEvent.START.ordinal() != 0"; @@ -50,7 +57,8 @@ public class KernelDeviceProfile { TABLE_COLUMN_WIDTH = max + 1; } - public KernelDeviceProfile(Class<? extends Kernel> kernel, Device device) { + public KernelDeviceProfile(KernelProfile parentProfile, Class<? extends Kernel> kernel, Device device) { + this.parentKernelProfile = parentProfile; this.kernel = kernel; this.device = device; this.format = (DecimalFormat) DecimalFormat.getNumberInstance(); @@ -58,8 +66,24 @@ public class KernelDeviceProfile { format.setMaximumFractionDigits(3); } - public void onEvent(ProfilingEvent event) { - if (event == ProfilingEvent.START) { + public boolean onEvent(ProfilingEvent event) { + final long currentThreadId = Thread.currentThread().getId(); + + synchronized (lock) { + if (event == ProfilingEvent.START && threadId == NO_THREAD) { + //A new profile report can be started at each new start event, however + //a new thread can only be allowed if currently there is no thread assigned, indicating + //no reports are in progress. + threadId = currentThreadId; + } else if (threadId != currentThreadId) { + //If a report is in progress by another thread, the current cannot start in the middle + return false; + } + //Otherwise if thread is the same that is already authorized or has a report in progress, + //it is allowed to proceed - this also guarantees backwards compatibility. + } + + if (event == ProfilingEvent.START) { if (lastEvent != null) { logger.log(Level.SEVERE, "ProfilingEvent.START encountered without ProfilingEvent.EXECUTED"); } else if (lastEvent == ProfilingEvent.START) { @@ -91,28 +115,61 @@ public class KernelDeviceProfile { } lastEvent = event; if (event == ProfilingEvent.EXECUTED) { + lastReport = createProfileReport(); + IProfileReportObserver observer = parentKernelProfile.getReportObserver(); lastEvent = null; + if (observer != null) { + observer.receiveReport(kernel, device, lastReport); + acknowledgeLastReport(); + } } + + return true; } - /** Elapsed time for a single event only, i.e. since the previous stage rather than from the start. */ - public double getLastElapsedTime(ProfilingEvent stage) { - if (stage == ProfilingEvent.START) { - return 0; - } - return (currentTimes[stage.ordinal()] - currentTimes[stage.ordinal() - 1]) / MILLION; + /** + * Acknowledges last profile report, so that a new one can be generated.<br/> + * <b>Note: </b>Only needed if multiple threads executes the same Aparapi kernel on the same Aparapi device, + * concurrently or not. + * After calling this method the last report data will be invalidated. + */ + public void acknowledgeLastReport() { + if (lastReport != null) { + synchronized (lock) { + lastReport = null; + threadId = NO_THREAD; + } + } + } + + private ProfileReport createProfileReport() { + final ProfileReport report = new ProfileReport(invocationCount, kernel, device, threadId, currentTimes); + + return report; } + public double getLastElapsedTime(int stage) { + return lastReport == null ? Double.NaN : lastReport.getElapsedTime(stage); + } + /** Elapsed time for all events {@code from} through {@code to}.*/ - public double getLastElapsedTime(ProfilingEvent from, ProfilingEvent to) { - return (currentTimes[to.ordinal()] - currentTimes[from.ordinal()]) / MILLION; + public double getLastElapsedTime(int from, int to) { + return lastReport == null ? Double.NaN : lastReport.getElapsedTime(from, to); + } + + public long getLastThreadId() { + return lastReport == null ? NO_THREAD : lastReport.getThreadId(); } + public ProfileReport getLastReport() { + return lastReport; + } + /** Elapsed time for a single event only, i.e. since the previous stage rather than from the start, summed over all executions. */ public double getCumulativeElapsedTime(ProfilingEvent stage) { return (accumulatedTimes[stage.ordinal()]) / MILLION; } - + /** Elapsed time of entire execution, summed over all executions. */ public double getCumulativeElapsedTimeAll() { double sum = 0; @@ -144,7 +201,7 @@ public class KernelDeviceProfile { appendRowHeaders(builder, device.getShortDescription(), String.valueOf(invocationCount)); for (int i = 1; i < currentTimes.length; ++i) { ProfilingEvent stage = ProfilingEvent.values()[i]; - double time = getLastElapsedTime(stage); + double time = getLastElapsedTime(stage.ordinal()); total += time; String formatted = format.format(time); appendCell(builder, formatted); diff --git a/src/main/java/com/aparapi/internal/kernel/KernelProfile.java b/src/main/java/com/aparapi/internal/kernel/KernelProfile.java index 017ce761..03110ebc 100644 --- a/src/main/java/com/aparapi/internal/kernel/KernelProfile.java +++ b/src/main/java/com/aparapi/internal/kernel/KernelProfile.java @@ -27,26 +27,33 @@ import java.util.logging.*; */ public class KernelProfile { - private static final double MILLION = 1000000d; + public static final double MILLION = 1000000d; + private static final long NO_THREAD = -1; private static Logger logger = Logger.getLogger(Config.getLoggerName()); private final Class<? extends Kernel> kernelClass; private LinkedHashMap<Device, KernelDeviceProfile> deviceProfiles = new LinkedHashMap<>(); private Device currentDevice; private Device lastDevice; private KernelDeviceProfile currentDeviceProfile; + private IProfileReportObserver observer; public KernelProfile(Class<? extends Kernel> _kernelClass) { kernelClass = _kernelClass; } + public long getLastThreadId() { + KernelDeviceProfile lastDeviceProfile = getLastDeviceProfile(); + return lastDeviceProfile == null ? NO_THREAD : lastDeviceProfile.getLastThreadId(); + } + public double getLastExecutionTime() { KernelDeviceProfile lastDeviceProfile = getLastDeviceProfile(); - return lastDeviceProfile == null ? Double.NaN : lastDeviceProfile.getLastElapsedTime(ProfilingEvent.START, ProfilingEvent.EXECUTED) / MILLION; + return lastDeviceProfile == null ? Double.NaN : lastDeviceProfile.getLastElapsedTime(ProfilingEvent.START.ordinal(), ProfilingEvent.EXECUTED.ordinal()); } public double getLastConversionTime() { KernelDeviceProfile lastDeviceProfile = getLastDeviceProfile(); - return lastDeviceProfile == null ? Double.NaN : lastDeviceProfile.getLastElapsedTime(ProfilingEvent.START, ProfilingEvent.PREPARE_EXECUTE) / MILLION; + return lastDeviceProfile == null ? Double.NaN : lastDeviceProfile.getLastElapsedTime(ProfilingEvent.START.ordinal(), ProfilingEvent.PREPARE_EXECUTE.ordinal()); } public double getAccumulatedTotalTime() { @@ -64,15 +71,16 @@ public class KernelProfile { } void onStart(Device device) { - currentDevice = device; synchronized (deviceProfiles) { currentDeviceProfile = deviceProfiles.get(device); if (currentDeviceProfile == null) { - currentDeviceProfile = new KernelDeviceProfile(kernelClass, device); + currentDeviceProfile = new KernelDeviceProfile(this, kernelClass, device); deviceProfiles.put(device, currentDeviceProfile); } } - currentDeviceProfile.onEvent(ProfilingEvent.START); + if (currentDeviceProfile.onEvent(ProfilingEvent.START)) { + currentDevice = device; + } } void onEvent(ProfilingEvent event) { @@ -118,4 +126,12 @@ public class KernelProfile { public KernelDeviceProfile getDeviceProfile(Device device) { return deviceProfiles.get(device); } + + public void setReportObserver(IProfileReportObserver _observer) { + observer = _observer; + } + + public IProfileReportObserver getReportObserver() { + return observer; + } } diff --git a/src/main/java/com/aparapi/internal/kernel/ProfilingEvent.java b/src/main/java/com/aparapi/internal/kernel/ProfilingEvent.java index bf0d5b55..bdf30c8e 100644 --- a/src/main/java/com/aparapi/internal/kernel/ProfilingEvent.java +++ b/src/main/java/com/aparapi/internal/kernel/ProfilingEvent.java @@ -15,9 +15,30 @@ */ package com.aparapi.internal.kernel; +import java.util.concurrent.atomic.AtomicReference; + /** * Created by Barney on 02/09/2015. */ public enum ProfilingEvent { - START, CLASS_MODEL_BUILT, INIT_JNI, OPENCL_GENERATED, OPENCL_COMPILED, PREPARE_EXECUTE, EXECUTED + START, CLASS_MODEL_BUILT, INIT_JNI, OPENCL_GENERATED, OPENCL_COMPILED, PREPARE_EXECUTE, EXECUTED; + + + static final AtomicReference<String[]> stagesNames = new AtomicReference<String[]>(null); + public static String[] getStagesNames() { + String[] result = null; + result = stagesNames.get(); + if (result == null) { + final String[] names = new String[values().length]; + for (int i = 0; i < values().length; i++) { + names[i] = values()[i].name(); + } + if (stagesNames.compareAndSet(null, names)) { + result = names; + } else { + result = stagesNames.get(); + } + } + return result; + } } diff --git a/src/test/java/com/aparapi/runtime/ProfileReportBackwardsCompatTest.java b/src/test/java/com/aparapi/runtime/ProfileReportBackwardsCompatTest.java new file mode 100644 index 00000000..36959d1e --- /dev/null +++ b/src/test/java/com/aparapi/runtime/ProfileReportBackwardsCompatTest.java @@ -0,0 +1,285 @@ +package com.aparapi.runtime; + +import static org.junit.Assume.assumeTrue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import com.aparapi.Kernel; +import com.aparapi.ProfileReport; +import com.aparapi.Range; +import com.aparapi.device.Device; +import com.aparapi.device.JavaDevice; +import com.aparapi.device.OpenCLDevice; +import com.aparapi.internal.kernel.KernelManager; + +/** + * Provides integration tests to help assure backwards compatibility under single thread per kernel per device + * execution environments. + * + * @author CoreRasurae + * + */ +public class ProfileReportBackwardsCompatTest { + private static OpenCLDevice openCLDevice; + + @Rule + public TestName name = new TestName(); + + + private class CLKernelManager extends KernelManager { + @Override + protected List<Device.TYPE> getPreferredDeviceTypes() { + return Arrays.asList(Device.TYPE.ACC, Device.TYPE.GPU, Device.TYPE.CPU); + } + } + + private class JTPKernelManager extends KernelManager { + private JTPKernelManager() { + LinkedHashSet<Device> preferredDevices = new LinkedHashSet<Device>(1); + preferredDevices.add(JavaDevice.THREAD_POOL); + setDefaultPreferredDevices(preferredDevices); + } + @Override + protected List<Device.TYPE> getPreferredDeviceTypes() { + return Arrays.asList(Device.TYPE.JTP); + } + } + + @Before + public void setUpBefore() throws Exception { + KernelManager.setKernelManager(new CLKernelManager()); + Device device = KernelManager.instance().bestDevice(); + if (device == null || !(device instanceof OpenCLDevice)) { + System.out.println("!!!No OpenCLDevice available for running the integration test - test will be skipped"); + } + assumeTrue (device != null && device instanceof OpenCLDevice); + openCLDevice = (OpenCLDevice) device; + } + + + /** + * This integration test validates that previous Kernel methods for retrieving profiling data + * are still consistent in the new implementation and with the new profile reports. + */ + @Test + public void sequentialSingleThreadOpenCLTest() { + System.out.println("Test " + name.getMethodName() + " - Executing on device: " + openCLDevice.getShortDescription() + " - " + openCLDevice.getName()); + assertTrue(sequentialSingleThreadTestHelper(openCLDevice, 128)); + } + + /** + * This integration test validates that previous Kernel methods for retrieving profiling data + * are still consistent in the new implementation and with the new profile reports. + */ + @Test + public void sequentialSingleThreadJTPTest() { + KernelManager.setKernelManager(new JTPKernelManager()); + Device device = KernelManager.instance().bestDevice(); + assertTrue(sequentialSingleThreadTestHelper(device, 32)); + } + + + public boolean sequentialSingleThreadTestHelper(Device device, int size) { + final int runs = 100; + final int inputArray[] = new int[size]; + double accumulatedExecutionTime = 0.0; + double lastExecutionTime = 0.0; + double lastConversionTime = 0.0; + final Basic1Kernel kernel = new Basic1Kernel(); + + int[] outputArray = null; + Range range = device.createRange(size, size); + long startOfExecution = System.currentTimeMillis(); + try { + for (int i = 0; i < runs; i++) { + outputArray = Arrays.copyOf(inputArray, inputArray.length); + kernel.setInputOuputArray(outputArray); + kernel.execute(range); + lastExecutionTime = kernel.getExecutionTime(); + accumulatedExecutionTime += lastExecutionTime; + lastConversionTime = kernel.getConversionTime(); + } + long runTime = System.currentTimeMillis() - startOfExecution; + ProfileReport report = kernel.getProfileReport(device); + assertEquals("Number of profiling reports doesn't match the expected", runs, report.getReportId()); + assertEquals("Aparapi Accumulated execution time doesn't match", accumulatedExecutionTime, kernel.getAccumulatedExecutionTime(), 1e-10); + assertEquals("Aparapi last execution time doesn't match last report", lastExecutionTime, report.getExecutionTime(), 1e-10); + assertEquals("Aparapi last conversion time doesn't match last report", lastConversionTime, report.getConversionTime(), 1e-10); + assertEquals("Test estimated accumulated time doesn't match within 100ms window", runTime, accumulatedExecutionTime, 100); + assertTrue(validateBasic1Kernel(inputArray, outputArray)); + } finally { + kernel.dispose(); + } + + return true; + } + + private class TestData { + private int[] outputArray; + private double accumulatedExecutionTime = 0.0; + private double lastExecutionTime = 0.0; + private double lastConversionTime = 0.0; + private long startOfExecution = 0; + private long runTime = 0; + } + + /** + * This test executes two threads one for each kernel on an OpenCL device and checks that the traditional Aparapi profiling interfaces work. + */ + @Test + public void threadedSingleThreadPerKernelOpenCLTest() { + System.out.println("Test " + name.getMethodName() + " - Executing on device: " + openCLDevice.getShortDescription() + " - " + openCLDevice.getName()); + assertTrue(threadedSingleThreadPerKernelTestHelper(openCLDevice, 128)); + } + + /** + * This test executes two threads one for each kernel on Java Thread Pool and checks that the traditional Aparapi profiling interfaces work. + */ + @Test + public void threadedSingleThreadPerKernelJTPTest() { + KernelManager.setKernelManager(new JTPKernelManager()); + Device device = KernelManager.instance().bestDevice(); + assertTrue(threadedSingleThreadPerKernelTestHelper(device, 16)); + } + + public boolean threadedSingleThreadPerKernelTestHelper(Device device, final int size) { + final int runs = 100; + final int inputArray[] = new int[size]; + + final Basic1Kernel kernel1 = new Basic1Kernel(); + final Basic1Kernel kernel2 = new Basic2Kernel(); + List<Basic1Kernel> kernels = new ArrayList<Basic1Kernel>(2); + kernels.add(kernel1); + kernels.add(kernel2); + + final TestData[] results = new TestData[2]; + results[0] = new TestData(); + results[1] = new TestData(); + + boolean terminatedOk = false; + try { + ExecutorService executorService = Executors.newFixedThreadPool(2); + try { + kernels.forEach(k -> executorService.submit(() -> { + results[k.getId() - 1].startOfExecution = System.currentTimeMillis(); + for (int i = 0; i < runs; i++) { + results[k.getId() - 1].outputArray = Arrays.copyOf(inputArray, inputArray.length); + k.setInputOuputArray(results[k.getId() - 1].outputArray); + k.execute(Range.create(device, size, size)); + results[k.getId() - 1].lastExecutionTime = k.getExecutionTime(); + results[k.getId() - 1].accumulatedExecutionTime += results[k.getId() - 1].lastExecutionTime; + results[k.getId() - 1].lastConversionTime = k.getConversionTime(); + } + results[k.getId() - 1].runTime = System.currentTimeMillis() - results[k.getId() - 1].startOfExecution; + })); + } finally { + executorService.shutdown(); + try { + terminatedOk = executorService.awaitTermination(5, TimeUnit.MINUTES); + } catch (InterruptedException ex) { + //For the purposes of the test this suffices + terminatedOk = false; + } + if (!terminatedOk) { + executorService.shutdownNow(); + } + } + + assertTrue(terminatedOk); + + //Validate kernel1 reports + ProfileReport report = kernel1.getProfileReport(device); + assertEquals("Number of profiling reports doesn't match the expected", runs, report.getReportId()); + assertEquals("Aparapi Accumulated execution time doesn't match", results[0].accumulatedExecutionTime, kernel1.getAccumulatedExecutionTime(), 1e-10); + assertEquals("Aparapi last execution time doesn't match last report", results[0].lastExecutionTime, report.getExecutionTime(), 1e-10); + assertEquals("Aparapi last conversion time doesn't match last report", results[0].lastConversionTime, report.getConversionTime(), 1e-10); + assertEquals("Test estimated accumulated time doesn't match within 100ms window", results[0].runTime, results[0].accumulatedExecutionTime, 100); + assertTrue(validateBasic1Kernel(inputArray, results[0].outputArray)); + + //Validate kernel2 reports + report = kernel2.getProfileReport(device); + assertEquals("Number of profiling reports doesn't match the expected", runs, report.getReportId()); + assertEquals("Aparapi Accumulated execution time doesn't match", results[1].accumulatedExecutionTime, kernel2.getAccumulatedExecutionTime(), 1e-10); + assertEquals("Aparapi last execution time doesn't match last report", results[1].lastExecutionTime, report.getExecutionTime(), 1e-10); + assertEquals("Aparapi last conversion time doesn't match last report", results[1].lastConversionTime, report.getConversionTime(), 1e-10); + assertEquals("Test estimated accumulated time doesn't match within 100ms window", results[1].runTime, results[1].accumulatedExecutionTime, 100); + assertTrue(validateBasic2Kernel(inputArray, results[1].outputArray)); + } finally { + kernel1.dispose(); + kernel2.dispose(); + } + + return true; + } + + private boolean validateBasic1Kernel(final int[] inputArray, final int[] resultArray) { + int[] expecteds = Arrays.copyOf(inputArray, inputArray.length); + for (int threadId = 0; threadId < inputArray.length; threadId++) { + expecteds[threadId] += threadId; + } + + assertArrayEquals(expecteds, resultArray); + + return true; + } + + private boolean validateBasic2Kernel(final int[] inputArray, final int[] resultArray) { + int[] expecteds = Arrays.copyOf(inputArray, inputArray.length); + for (int threadId = 0; threadId < inputArray.length; threadId++) { + expecteds[threadId] += threadId+1; + } + + assertArrayEquals(expecteds, resultArray); + + return true; + } + + private class Basic1Kernel extends Kernel { + protected int[] workArray; + + @NoCL + public void setInputOuputArray(int[] array) { + workArray = array; + } + + @NoCL + public int getId() { + return 1; + } + + @Override + public void run() { + int id = getLocalId(); + + workArray[id]+=id; + } + } + + private class Basic2Kernel extends Basic1Kernel { + @Override + @NoCL + public int getId() { + return 2; + } + + @Override + public void run() { + int id = getLocalId(); + + workArray[id]+=id+1; + } + } + +} diff --git a/src/test/java/com/aparapi/runtime/ProfileReportNewAPITest.java b/src/test/java/com/aparapi/runtime/ProfileReportNewAPITest.java new file mode 100644 index 00000000..8fbc882d --- /dev/null +++ b/src/test/java/com/aparapi/runtime/ProfileReportNewAPITest.java @@ -0,0 +1,339 @@ +package com.aparapi.runtime; + +import static org.junit.Assume.assumeTrue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.*; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import com.aparapi.IProfileReportObserver; +import com.aparapi.Kernel; +import com.aparapi.ProfileReport; +import com.aparapi.Range; +import com.aparapi.device.Device; +import com.aparapi.device.JavaDevice; +import com.aparapi.device.OpenCLDevice; +import com.aparapi.internal.kernel.KernelManager; + +/** + * Provides integration tests to help in assuring that new APIs for ProfileReports are working, + * in single threaded and multi-threaded environments. + * + * @author CoreRasurae + */ +public class ProfileReportNewAPITest { + + private static OpenCLDevice openCLDevice; + + @Rule + public TestName name = new TestName(); + + + private class CLKernelManager extends KernelManager { + @Override + protected List<Device.TYPE> getPreferredDeviceTypes() { + return Arrays.asList(Device.TYPE.ACC, Device.TYPE.GPU, Device.TYPE.CPU); + } + } + + private class JTPKernelManager extends KernelManager { + private JTPKernelManager() { + LinkedHashSet<Device> preferredDevices = new LinkedHashSet<Device>(1); + preferredDevices.add(JavaDevice.THREAD_POOL); + setDefaultPreferredDevices(preferredDevices); + } + @Override + protected List<Device.TYPE> getPreferredDeviceTypes() { + return Arrays.asList(Device.TYPE.JTP); + } + } + + @Before + public void setUpBefore() throws Exception { + KernelManager.setKernelManager(new CLKernelManager()); + Device device = KernelManager.instance().bestDevice(); + if (device == null || !(device instanceof OpenCLDevice)) { + System.out.println("!!!No OpenCLDevice available for running the integration test - test will be skipped"); + } + assumeTrue (device != null && device instanceof OpenCLDevice); + openCLDevice = (OpenCLDevice) device; + } + + /** + * Tests the ProfileReport observer interface in a single threaded, single kernel environment running on + * an OpenCL device. + */ + @Test + public void singleThreadedSingleKernelObserverOpenCLTest() { + System.out.println("Test " + name.getMethodName() + " - Executing on device: " + openCLDevice.getShortDescription() + " - " + openCLDevice.getName()); + assertTrue(singleThreadedSingleKernelReportObserverTestHelper(openCLDevice, 128)); + } + + /** + * Tests the ProfileReport observer interface in a single threaded, single kernel environment running on + * Java Thread Pool. + */ + @Test + public void singleThreadedSingleKernelObserverJTPTest() { + KernelManager.setKernelManager(new JTPKernelManager()); + Device device = KernelManager.instance().bestDevice(); + assertTrue(singleThreadedSingleKernelReportObserverTestHelper(device, 16)); + } + + private class ReportObserverNoConcurrentCalls implements IProfileReportObserver { + private final ConcurrentSkipListSet<Long> expectedThreadsIds = new ConcurrentSkipListSet<>(); + private final ConcurrentSkipListSet<Long> observedThreadsIds = new ConcurrentSkipListSet<>(); + private final AtomicInteger atomicSimultaneousCalls = new AtomicInteger(0); + private final Device device; + private long receivedReportsCount = 0; + private double accumulatedElapsedTime = 0.0; + + private ReportObserverNoConcurrentCalls(Device _device) { + device = _device; + } + + private void addAcceptedThreadId(long threadId) { + expectedThreadsIds.add(threadId); + } + + private double getAccumulatedElapsedTime() { + return accumulatedElapsedTime; + } + + private long getReceivedReportsCount() { + return receivedReportsCount; + } + + @Override + public void receiveReport(Class<? extends Kernel> kernelClass, Device _device, ProfileReport profileInfo) { + int currentSimultaneousCalls = atomicSimultaneousCalls.incrementAndGet(); + assertTrue("Observer was called concurrently", currentSimultaneousCalls == 1); + accumulatedElapsedTime += profileInfo.getExecutionTime(); + assertEquals("Kernel class does not match", Basic1Kernel.class, kernelClass); + assertEquals("Device does not match", device, _device); + boolean isThreadAccepted = expectedThreadsIds.contains(profileInfo.getThreadId()); + assertTrue("Thread generating the report (" + profileInfo.getThreadId() + + ") is not among the accepted ones: " + expectedThreadsIds.toString(), isThreadAccepted); + ++receivedReportsCount; + assertEquals("Received report count doesn't match current ID - reports were lost", receivedReportsCount, profileInfo.getReportId()); + observedThreadsIds.add(profileInfo.getThreadId()); + atomicSimultaneousCalls.decrementAndGet(); + } + } + + public boolean singleThreadedSingleKernelReportObserverTestHelper(Device device, int size) { + final int runs = 100; + final int inputArray[] = new int[size]; + final Basic1Kernel kernel = new Basic1Kernel(); + + int[] outputArray = null; + Range range = device.createRange(size, size); + + ReportObserverNoConcurrentCalls observer = new ReportObserverNoConcurrentCalls(device); + observer.addAcceptedThreadId(Thread.currentThread().getId()); + kernel.registerProfileReportObserver(observer); + + long startOfExecution = System.currentTimeMillis(); + try { + for (int i = 0; i < runs; i++) { + outputArray = Arrays.copyOf(inputArray, inputArray.length); + kernel.setInputOuputArray(outputArray); + kernel.execute(range); + } + + long runTime = System.currentTimeMillis() - startOfExecution; + assertEquals("Number of profiling reports doesn't match the expected", runs, observer.getReceivedReportsCount()); + assertEquals("Aparapi Accumulated execution time doesn't match", kernel.getAccumulatedExecutionTime(device), observer.getAccumulatedElapsedTime(), 1e-10); + assertEquals("Test estimated accumulated time doesn't match within 100ms window", runTime, kernel.getAccumulatedExecutionTime(device), 100); + assertTrue(validateBasic1Kernel(inputArray, outputArray)); + } finally { + kernel.dispose(); + } + + return true; + } + + /** + * Tests the ProfileReport observer interface in a multi threaded, single kernel environment running on + * an OpenCL device. + */ + @Test + public void multiThreadedSingleKernelObserverOpenCLTest() throws Exception { + System.out.println("Test " + name.getMethodName() + " - Executing on device: " + openCLDevice.getShortDescription() + " - " + openCLDevice.getName()); + assertTrue(multiThreadedSingleKernelReportObserverTestHelper(openCLDevice, 128)); + } + + /** + * Tests the ProfileReport observer interface in a multi threaded, single kernel environment running on + * Java Thread Pool. + */ + @Test + public void multiThreadedSingleKernelObserverJTPTest() throws Exception { + KernelManager.setKernelManager(new JTPKernelManager()); + Device device = KernelManager.instance().bestDevice(); + assertTrue(multiThreadedSingleKernelReportObserverTestHelper(device, 16)); + } + + private class ThreadResults { + private long startOfExecution; + private long runTime; + private long threadId; + private int kernelCalls; + private int[] outputArray; + } + + @SuppressWarnings("unchecked") + public boolean multiThreadedSingleKernelReportObserverTestHelper(Device device, int size) throws InterruptedException, ExecutionException { + final int runs = 100; + final int javaThreads = 10; + final int inputArray[] = new int[size]; + final AtomicInteger atomicResultId = new AtomicInteger(0); + final CyclicBarrier barrier = new CyclicBarrier(javaThreads); + ExecutorService executorService = Executors.newFixedThreadPool(javaThreads); + + final ReportObserverNoConcurrentCalls observer = new ReportObserverNoConcurrentCalls(device); + + final List<Basic1Kernel> kernels = new ArrayList<Basic1Kernel>(javaThreads); + for (int i = 0; i < javaThreads; i++) { + final Basic1Kernel kernel = new Basic1Kernel(); + kernel.registerProfileReportObserver(observer); + kernels.add(kernel); + } + + final ThreadResults[] results = new ThreadResults[javaThreads]; + for (int i = 0; i < results.length; i++) { + results[i] = new ThreadResults(); + } + + boolean terminatedOk = false; + try { + List<Future<Runnable>> futures = new ArrayList<>(javaThreads); + for (Basic1Kernel k : kernels) { + futures.add((Future<Runnable>)executorService.submit(new Runnable() { + @Override + public void run() { + int id = atomicResultId.getAndIncrement(); + results[id].threadId = Thread.currentThread().getId(); + observer.addAcceptedThreadId(results[id].threadId); + results[id].startOfExecution = System.currentTimeMillis(); + results[id].kernelCalls = 0; + for (int i = 0; i < runs; i++) { + results[id].outputArray = Arrays.copyOf(inputArray, inputArray.length); + k.setInputOuputArray(results[id].outputArray); + k.execute(Range.create(device, size, size)); + results[id].kernelCalls++; + if (i == 0) { + //Ensure that each thread sends at least one report + boolean retry = true; + while (retry) { + try { + barrier.await(10, TimeUnit.SECONDS); + retry = false; + } catch (InterruptedException e) { + retry = true; + } catch (BrokenBarrierException e) { + throw new RuntimeException("Failed on barrier", e); + } catch (TimeoutException e) { + throw new RuntimeException("Failed on barrier", e); + } + } + } + } + results[id].runTime = System.currentTimeMillis() - results[id].startOfExecution; + } + })); + } + for (Future<Runnable> future : futures) { + future.get(); + } + } finally { + executorService.shutdown(); + try { + terminatedOk = executorService.awaitTermination(1, TimeUnit.MINUTES); + } catch (InterruptedException ex) { + //For the purposes of the test this suffices + terminatedOk = false; + } + if (!terminatedOk) { + executorService.shutdownNow(); + } + } + + assertTrue("Threads did not terminate correctly", terminatedOk); + + int totalNumberOfCalls = 0; + double minExecutionTime = Double.MAX_VALUE; + double maxExecutionTime = 0; + for (int i = 0; i < javaThreads; i++) { + totalNumberOfCalls += results[i].kernelCalls; + maxExecutionTime = Math.max(maxExecutionTime, results[i].runTime); + minExecutionTime = Math.min(minExecutionTime, results[i].runTime); + } + + //Sometimes on slower machines, it may happen that the observer doesn't receive as many reports as the number of kernel runs. + assertTrue("Number of total reports must be more than the iteration count", observer.getReceivedReportsCount() >= runs); + assertTrue("Number of reports is less than the total number of calls", observer.getReceivedReportsCount() <= totalNumberOfCalls); + assertTrue("Number of Java threads sending profile reports should be at least 1", observer.observedThreadsIds.size() >= 1); + if (device instanceof OpenCLDevice) { + //It is expected that the observer accumulated elapsed time is less than the estimated execution time, which includes additional test execution overhead. + //On JTP this difference can become bigger, so Java devices are excluded from this check. + assertEquals("Report execution time doesn't match within 300ms of real min. kernel execution time", minExecutionTime, observer.getAccumulatedElapsedTime(), 300); + assertEquals("Report execution time doesn't match within 300ms of real max. kernel execution time", maxExecutionTime, observer.getAccumulatedElapsedTime(), 300); + } + for (int i = 0; i < javaThreads; i++) { + assertEquals("Thread index " + i + " didn't make the expected number of kernel runs", runs, results[i].kernelCalls); + assertTrue("Thread index " + i + " kernel computation doesn't match the expected", validateBasic1Kernel(inputArray, results[i].outputArray)); + } + + return true; + } + + private boolean validateBasic1Kernel(final int[] inputArray, final int[] resultArray) { + int[] expecteds = Arrays.copyOf(inputArray, inputArray.length); + for (int threadId = 0; threadId < inputArray.length; threadId++) { + expecteds[threadId] += threadId; + } + + assertArrayEquals(expecteds, resultArray); + + return true; + } + + private class Basic1Kernel extends Kernel { + protected int[] workArray; + + @NoCL + public void setInputOuputArray(int[] array) { + workArray = array; + } + + @NoCL + public int getId() { + return 1; + } + + @Override + public void run() { + int id = getLocalId(); + + workArray[id]+=id; + } + } +} diff --git a/src/test/java/com/aparapi/runtime/ProfileReportUnitTest.java b/src/test/java/com/aparapi/runtime/ProfileReportUnitTest.java new file mode 100644 index 00000000..480b09b6 --- /dev/null +++ b/src/test/java/com/aparapi/runtime/ProfileReportUnitTest.java @@ -0,0 +1,263 @@ +package com.aparapi.runtime; + +import static org.junit.Assert.*; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import com.aparapi.IProfileReportObserver; +import com.aparapi.Kernel; +import com.aparapi.ProfileReport; +import com.aparapi.device.Device; +import com.aparapi.device.JavaDevice; +import com.aparapi.internal.kernel.KernelDeviceProfile; +import com.aparapi.internal.kernel.KernelProfile; +import com.aparapi.internal.kernel.ProfilingEvent; + +/** + * This class provides unit tests to help in validation of thread-safe ProfileReports. + * + * @author CoreRasurae + */ +public class ProfileReportUnitTest { + + private class SimpleKernel extends Kernel { + + @Override + public void run() { + } + } + + /** + * This test validates that no thread can start a profiling process at any stage after another thread + * that has already started profiling. + * @throws Exception + */ + @Test + public void testNoThreadCanStartPass() throws Exception { + final int javaThreads = ProfilingEvent.values().length; + final int runs = 100; + final KernelProfile kernelProfile = new KernelProfile(SimpleKernel.class); + final KernelDeviceProfile kernelDeviceProfile = new KernelDeviceProfile(kernelProfile, SimpleKernel.class, JavaDevice.THREAD_POOL); + final AtomicBoolean receivedReport = new AtomicBoolean(false); + final AtomicBoolean[] onEventAccepted = new AtomicBoolean[javaThreads]; + final AtomicInteger idx = new AtomicInteger(0); + for (int i = 0; i < javaThreads; i++) { + onEventAccepted[i] = new AtomicBoolean(false); + } + + kernelProfile.setReportObserver(new IProfileReportObserver() { + @Override + public void receiveReport(Class<? extends Kernel> kernelClass, Device device, ProfileReport profileInfo) { + receivedReport.set(true); + } + }); + + + //This is the only thread that should start an event in this test + assertTrue(kernelDeviceProfile.onEvent(ProfilingEvent.START)); + + + List<ProfilingEvent> events = Arrays.asList(ProfilingEvent.values()); + + ExecutorService executorService = Executors.newFixedThreadPool(javaThreads); + try { + events.forEach(evt -> executorService.submit(() -> { + for (int i = 0; i < runs; i++) { + if (kernelDeviceProfile.onEvent(evt)) { + onEventAccepted[idx.getAndIncrement()].set(true); + } + } + })); + } finally { + executorService.shutdown(); + if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) { + executorService.shutdownNow(); + throw new Exception("ExecutorService terminated abnormaly"); + } + } + + for (int i = 0; i < javaThreads; i++) { + assertFalse("Event was accepted for thread with index " + i, onEventAccepted[i].get()); + } + assertFalse("No report should have been received", receivedReport.get()); + } + + + /** + * This test validates that only a single thread can start, after being acknowledged, and that report is received. + * @throws Exception + */ + @Test + public void testNoThreadStartsBeforeAckPass() throws Exception { + final int javaThreads = ProfilingEvent.values().length + 1; + final int runs = 100; + final KernelProfile kernelProfile = new KernelProfile(SimpleKernel.class); + final KernelDeviceProfile kernelDeviceProfile = new KernelDeviceProfile(kernelProfile, SimpleKernel.class, JavaDevice.THREAD_POOL); + final AtomicBoolean receivedReport = new AtomicBoolean(false); + final AtomicBoolean[] onEventAccepted = new AtomicBoolean[javaThreads]; + final AtomicInteger index = new AtomicInteger(0); + for (int i = 0; i < javaThreads; i++) { + onEventAccepted[i] = new AtomicBoolean(false); + } + + kernelProfile.setReportObserver(new IProfileReportObserver() { + @Override + public void receiveReport(Class<? extends Kernel> kernelClass, Device device, ProfileReport profileInfo) { + receivedReport.set(true); + for (int i = 0; i < javaThreads; i++) { + assertFalse("Event was accepted earlier for thread with index " + i, onEventAccepted[i].get()); + } + } + }); + + + //This is the only thread that should start an event in this test + assertTrue(kernelDeviceProfile.onEvent(ProfilingEvent.START)); + assertTrue(kernelDeviceProfile.onEvent(ProfilingEvent.EXECUTED)); + + List<ProfilingEvent> events = new ArrayList<>(ProfilingEvent.values().length + 1); + events.addAll(Arrays.asList(ProfilingEvent.values())); + events.add(ProfilingEvent.START); + + ExecutorService executorService = Executors.newFixedThreadPool(javaThreads); + try { + events.forEach(evt -> { + final int idx = index.getAndIncrement(); + executorService.submit(() -> { + for (int i = 0; i < runs; i++) { + if (kernelDeviceProfile.onEvent(evt)) { + onEventAccepted[idx].set(true); + } + } + }); + }); + } finally { + executorService.shutdown(); + if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) { + executorService.shutdownNow(); + throw new Exception("ExecutorService terminated abnormaly"); + } + } + + boolean accepted = false; + for (int i = 0; i < javaThreads; i++) { + if (i == 0) { + accepted = onEventAccepted[i].get(); + } else if (i == ProfilingEvent.values().length-1) { + if (accepted == false) { + assertTrue("Event should have been accepted", onEventAccepted[i].get()); + } else { + assertFalse("Event shouldn't have been accepted", onEventAccepted[i].get()); + } + } else { + assertFalse("Event was accepted on wrong event stage for thread with index " + i, onEventAccepted[i].get()); + } + } + assertTrue("Event should have been accepted by another thread", accepted); + assertTrue("Report should have been received", receivedReport.get()); + } + + /** + * This test validates that another thread cannot start a profile, before the current being acknowledged, when + * no observer is registered. + * @throws Exception + */ + @Test + public void testAcknowledgeAllowsOtherThreadToRunPass() throws Exception { + final int javaThreads = ProfilingEvent.values().length + 1; + final int runs = 100; + final KernelProfile kernelProfile = new KernelProfile(SimpleKernel.class); + final KernelDeviceProfile kernelDeviceProfile = new KernelDeviceProfile(kernelProfile, SimpleKernel.class, JavaDevice.THREAD_POOL); + final AtomicBoolean[] onEventAccepted = new AtomicBoolean[javaThreads]; + final AtomicInteger index = new AtomicInteger(0); + for (int i = 0; i < javaThreads; i++) { + onEventAccepted[i] = new AtomicBoolean(false); + } + + //This is the only thread that should start an event in this test + assertTrue(kernelDeviceProfile.onEvent(ProfilingEvent.START)); + assertTrue(kernelDeviceProfile.onEvent(ProfilingEvent.EXECUTED)); + + List<ProfilingEvent> events = new ArrayList<>(ProfilingEvent.values().length + 1); + events.addAll(Arrays.asList(ProfilingEvent.values())); + events.add(ProfilingEvent.START); + + final ExecutorService executorService = Executors.newFixedThreadPool(javaThreads); + try { + events.forEach(evt -> { + final int idx = index.getAndIncrement(); + executorService.submit(() -> { + for (int i = 0; i < runs; i++) { + if (kernelDeviceProfile.onEvent(evt)) { + onEventAccepted[idx].set(true); + } + } + }); + }); + } finally { + executorService.shutdown(); + if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) { + executorService.shutdownNow(); + throw new Exception("ExecutorService terminated abnormaly"); + } + } + + for (int i = 0; i < javaThreads; i++) { + assertFalse("Event was accepted for thread with index " + i, onEventAccepted[i].get()); + } + + ProfileReport report = kernelDeviceProfile.getLastReport(); + assertNotNull("Profile report shouldn't be null", report); + assertEquals("Thread Id doesn't match the expected", Thread.currentThread().getId(), report.getThreadId()); + assertEquals("Device doesn't match", report.getDevice(), JavaDevice.THREAD_POOL); + assertEquals("Class doesn't match", report.getKernelClass(), SimpleKernel.class); + + kernelDeviceProfile.acknowledgeLastReport(); + + index.set(0); + final ExecutorService executorServiceB = Executors.newFixedThreadPool(javaThreads); + try { + events.forEach(evt -> { + final int idx = index.getAndIncrement(); + executorServiceB.submit(() -> { + for (int i = 0; i < runs; i++) { + if (kernelDeviceProfile.onEvent(evt)) { + onEventAccepted[idx].set(true); + } + } + }); + }); + } finally { + executorServiceB.shutdown(); + if (!executorServiceB.awaitTermination(1, TimeUnit.MINUTES)) { + executorServiceB.shutdownNow(); + throw new Exception("ExecutorService terminated abnormaly"); + } + } + + boolean accepted = false; + for (int i = 0; i < javaThreads; i++) { + if (i == 0) { + accepted = onEventAccepted[i].get(); + } else if (i == ProfilingEvent.values().length-1) { + if (accepted == false) { + assertTrue("Event should have been accepted", onEventAccepted[i].get()); + } else { + assertFalse("Event shouldn't have been accepted", onEventAccepted[i].get()); + } + } else { + assertFalse("Event was accepted on wrong event stage for thread with index " + i, onEventAccepted[i].get()); + } + } + assertTrue("Event should have been accepted by another thread", accepted); + } +} -- GitLab