util/u_queue: track job size and limit the size of queue growth
When both UTIL_QUEUE_INIT_RESIZE_IF_FULL and UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY are set, we can get into a situation where the queue never executes and grows to a huge size due to all other threads being busy. This is the case with the shader cache when attempting to compile a huge number of shaders up front. If all threads are busy compiling shaders the cache queues memory use can climb into the many GBs very fast. The use of these two flags with the shader cache is intended to allow shaders compiled at runtime to be compiled as fast as possible. To avoid huge memory use but still allow the queue to perform optimally in the run time compilation case, we now add the ability to track memory consumed by the jobs in the queue and limit it to a hardcoded 256MB which should be more than enough. Reviewed-by: Marek Olšák <marek.olsak@amd.com>
This commit is contained in:
@@ -116,7 +116,7 @@ tc_batch_flush(struct threaded_context *tc)
|
||||
}
|
||||
|
||||
util_queue_add_job(&tc->queue, next, &next->fence, tc_batch_execute,
|
||||
NULL);
|
||||
NULL, 0);
|
||||
tc->last = tc->next;
|
||||
tc->next = (tc->next + 1) % TC_MAX_BATCHES;
|
||||
}
|
||||
|
@@ -336,7 +336,7 @@ batch_flush(struct fd_batch *batch)
|
||||
|
||||
util_queue_add_job(&batch->ctx->flush_queue,
|
||||
batch, &batch->flush_fence,
|
||||
batch_flush_func, batch_cleanup_func);
|
||||
batch_flush_func, batch_cleanup_func, 0);
|
||||
} else {
|
||||
fd_gmem_render_tiles(batch);
|
||||
batch_reset_resources(batch);
|
||||
|
@@ -2358,7 +2358,8 @@ current_not_ready:
|
||||
/* Compile it asynchronously. */
|
||||
util_queue_add_job(&sscreen->shader_compiler_queue_low_priority,
|
||||
shader, &shader->ready,
|
||||
si_build_shader_variant_low_priority, NULL);
|
||||
si_build_shader_variant_low_priority, NULL,
|
||||
0);
|
||||
|
||||
/* Add only after the ready fence was reset, to guard against a
|
||||
* race with si_bind_XX_shader. */
|
||||
@@ -2615,7 +2616,7 @@ void si_schedule_initial_compile(struct si_context *sctx, unsigned processor,
|
||||
}
|
||||
|
||||
util_queue_add_job(&sctx->screen->shader_compiler_queue, job,
|
||||
ready_fence, execute, NULL);
|
||||
ready_fence, execute, NULL, 0);
|
||||
|
||||
if (debug) {
|
||||
util_queue_fence_wait(ready_fence);
|
||||
|
@@ -1756,7 +1756,7 @@ static int amdgpu_cs_flush(struct radeon_cmdbuf *rcs,
|
||||
|
||||
/* Submit. */
|
||||
util_queue_add_job(&ws->cs_queue, cs, &cs->flush_completed,
|
||||
amdgpu_cs_submit_ib, NULL);
|
||||
amdgpu_cs_submit_ib, NULL, 0);
|
||||
/* The submission has been queued, unlock the fence now. */
|
||||
simple_mtx_unlock(&ws->bo_fence_lock);
|
||||
|
||||
|
@@ -697,7 +697,7 @@ static int radeon_drm_cs_flush(struct radeon_cmdbuf *rcs,
|
||||
|
||||
if (util_queue_is_initialized(&cs->ws->cs_queue)) {
|
||||
util_queue_add_job(&cs->ws->cs_queue, cs, &cs->flush_completed,
|
||||
radeon_drm_cs_emit_ioctl_oneshot, NULL);
|
||||
radeon_drm_cs_emit_ioctl_oneshot, NULL, 0);
|
||||
if (!(flags & PIPE_FLUSH_ASYNC))
|
||||
radeon_drm_cs_sync_flush(rcs);
|
||||
} else {
|
||||
|
@@ -99,7 +99,7 @@ _mesa_glthread_init(struct gl_context *ctx)
|
||||
struct util_queue_fence fence;
|
||||
util_queue_fence_init(&fence);
|
||||
util_queue_add_job(&glthread->queue, ctx, &fence,
|
||||
glthread_thread_initialization, NULL);
|
||||
glthread_thread_initialization, NULL, 0);
|
||||
util_queue_fence_wait(&fence);
|
||||
util_queue_fence_destroy(&fence);
|
||||
}
|
||||
@@ -167,7 +167,7 @@ _mesa_glthread_flush_batch(struct gl_context *ctx)
|
||||
p_atomic_add(&glthread->stats.num_offloaded_items, next->used);
|
||||
|
||||
util_queue_add_job(&glthread->queue, next, &next->fence,
|
||||
glthread_unmarshal_batch, NULL);
|
||||
glthread_unmarshal_batch, NULL, 0);
|
||||
glthread->last = glthread->next;
|
||||
glthread->next = (glthread->next + 1) % MARSHAL_MAX_BATCHES;
|
||||
}
|
||||
|
@@ -1037,7 +1037,7 @@ disk_cache_put(struct disk_cache *cache, const cache_key key,
|
||||
if (dc_job) {
|
||||
util_queue_fence_init(&dc_job->fence);
|
||||
util_queue_add_job(&cache->cache_queue, dc_job, &dc_job->fence,
|
||||
cache_put, destroy_put_job);
|
||||
cache_put, destroy_put_job, 0);
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -33,6 +33,9 @@
|
||||
#include "util/u_thread.h"
|
||||
#include "u_process.h"
|
||||
|
||||
/* Define 256MB */
|
||||
#define S_256MB (256 * 1024 * 1024)
|
||||
|
||||
static void
|
||||
util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads,
|
||||
bool finish_locked);
|
||||
@@ -290,6 +293,8 @@ util_queue_thread_func(void *input)
|
||||
util_queue_fence_signal(job.fence);
|
||||
if (job.cleanup)
|
||||
job.cleanup(job.job, thread_index);
|
||||
|
||||
queue->total_jobs_size -= job.job_size;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -513,7 +518,8 @@ util_queue_add_job(struct util_queue *queue,
|
||||
void *job,
|
||||
struct util_queue_fence *fence,
|
||||
util_queue_execute_func execute,
|
||||
util_queue_execute_func cleanup)
|
||||
util_queue_execute_func cleanup,
|
||||
const size_t job_size)
|
||||
{
|
||||
struct util_queue_job *ptr;
|
||||
|
||||
@@ -531,7 +537,8 @@ util_queue_add_job(struct util_queue *queue,
|
||||
assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
|
||||
|
||||
if (queue->num_queued == queue->max_jobs) {
|
||||
if (queue->flags & UTIL_QUEUE_INIT_RESIZE_IF_FULL) {
|
||||
if (queue->flags & UTIL_QUEUE_INIT_RESIZE_IF_FULL &&
|
||||
queue->total_jobs_size + job_size < S_256MB) {
|
||||
/* If the queue is full, make it larger to avoid waiting for a free
|
||||
* slot.
|
||||
*/
|
||||
@@ -570,7 +577,10 @@ util_queue_add_job(struct util_queue *queue,
|
||||
ptr->fence = fence;
|
||||
ptr->execute = execute;
|
||||
ptr->cleanup = cleanup;
|
||||
ptr->job_size = job_size;
|
||||
|
||||
queue->write_idx = (queue->write_idx + 1) % queue->max_jobs;
|
||||
queue->total_jobs_size += ptr->job_size;
|
||||
|
||||
queue->num_queued++;
|
||||
cnd_signal(&queue->has_queued_cond);
|
||||
@@ -642,7 +652,8 @@ util_queue_finish(struct util_queue *queue)
|
||||
|
||||
for (unsigned i = 0; i < queue->num_threads; ++i) {
|
||||
util_queue_fence_init(&fences[i]);
|
||||
util_queue_add_job(queue, &barrier, &fences[i], util_queue_finish_execute, NULL);
|
||||
util_queue_add_job(queue, &barrier, &fences[i],
|
||||
util_queue_finish_execute, NULL, 0);
|
||||
}
|
||||
|
||||
for (unsigned i = 0; i < queue->num_threads; ++i) {
|
||||
|
@@ -193,6 +193,7 @@ typedef void (*util_queue_execute_func)(void *job, int thread_index);
|
||||
|
||||
struct util_queue_job {
|
||||
void *job;
|
||||
size_t job_size;
|
||||
struct util_queue_fence *fence;
|
||||
util_queue_execute_func execute;
|
||||
util_queue_execute_func cleanup;
|
||||
@@ -212,6 +213,7 @@ struct util_queue {
|
||||
unsigned num_threads; /* decreasing this number will terminate threads */
|
||||
int max_jobs;
|
||||
int write_idx, read_idx; /* ring buffer pointers */
|
||||
size_t total_jobs_size; /* memory use of all jobs in the queue */
|
||||
struct util_queue_job *jobs;
|
||||
|
||||
/* for cleanup at exit(), protected by exit_mutex */
|
||||
@@ -230,7 +232,8 @@ void util_queue_add_job(struct util_queue *queue,
|
||||
void *job,
|
||||
struct util_queue_fence *fence,
|
||||
util_queue_execute_func execute,
|
||||
util_queue_execute_func cleanup);
|
||||
util_queue_execute_func cleanup,
|
||||
const size_t job_size);
|
||||
void util_queue_drop_job(struct util_queue *queue,
|
||||
struct util_queue_fence *fence);
|
||||
|
||||
|
Reference in New Issue
Block a user