// A library to parallelize code on the Mellanox TILE-Gx easily.
// Written by Nils Liaaen Corneliusen 2017.
// License: CC0 1.0 Universal (CC0 1.0) Public Domain Dedication license
// Please refer to the article at http://www.ignorantus.com/pages/tilegx_integer_raytracer/
// for more information.
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <assert.h>
#include <pthread.h>
#include <tmc/cpus.h>
#include <tmc/task.h>
#include <tmc/udn.h>
#include <arch/cycle.h>

#include "par.h"

unsigned int par_cores_total  = 0;
unsigned int par_cores_max    = 0;
unsigned int par_cores_active = 0;

uint64_t host_start = 0;
uint64_t host_stop  = 0;
uint64_t host_wait_start = 0;
uint64_t host_wait_done  = 0;

#define THREAD_MEASURE_START  0xfff1
#define THREAD_MEASURE_STOP   0xfff3
#define THREAD_MEASURE_REPORT 0xfff5
#define THREAD_QUIT           0xffff

unsigned int par_get_cores_total( void )
{
    return par_cores_total;
}

void par_sendjob( void *func, void *data, unsigned int len )
{
    int core;
    DynamicHeader header;

    assert( len <= DATA_LEN_MAX );

    // still available cores?
    if( par_cores_active < par_cores_max ) {
        // yes: start next
        par_cores_active++;
        core = par_cores_active;
    } else {
        // no: wait for a core to be done
        uint64_t t0 = par_get_cycle_count();
        core = tmc_udn0_receive();
        uint64_t t1 = par_get_cycle_count();
        host_wait_start += t1-t0;
    }

    header = tmc_udn_header_from_cpu( core );

    tmc_udn_send_2( header, UDN0_DEMUX_TAG, (uint64_t)func, len );
    tmc_udn_send_buffer( header, UDN0_DEMUX_TAG, data, (len+7)/8 );
}

void par_wait( void )
{
    if( par_cores_active == 0 ) return;

    uint64_t t0 = par_get_cycle_count();
    for( unsigned int i = 0; i < par_cores_active; i++ )
        tmc_udn0_receive();
    uint64_t t1 = par_get_cycle_count();
    host_wait_done += t1-t0;

    par_cores_active = 0;
}

void par_set_cores( unsigned int cores )
{
    par_cores_max = cores > par_cores_total ? par_cores_total : cores;
}

static void *par_thread( void *arg )
{
    uint64_t coreid = (uint64_t)arg;
    uint64_t core_start = 0;
    uint64_t core_stop  = 0;
    uint64_t core_work  = 0;
    uint64_t core_wait  = 0;

    if( tmc_cpus_set_my_cpu( coreid ) != 0 )
        tmc_task_die( "Core %d: tmc_cpus_set_my_cpu() failed.", (int)coreid );

    if( tmc_udn_activate() < 0 )
        tmc_task_die( "Core %d: tmc_udn_activate() failed.", (int)coreid );

    // initialize stuff
    DynamicHeader header = tmc_udn_header_from_cpu( 0 );
    void *data = malloc( DATA_LEN_MAX );

    // ready to start
    tmc_udn_send_1( header, UDN0_DEMUX_TAG, coreid );

    while( true ) {
        void (* func)(void *data, int len);
        unsigned int len;
        uint64_t t0, t1;

        // get command
        t0 = par_get_cycle_count();
        uint64_t cmd = tmc_udn0_receive();
        t1 = par_get_cycle_count();

        if( cmd == THREAD_QUIT ) return 0;

        if( cmd == THREAD_MEASURE_START ) {
            core_start = par_get_cycle_count();
            core_work  = 0;
            core_wait  = 0;
            continue;
        }

        if( cmd == THREAD_MEASURE_STOP ) {
            core_stop = par_get_cycle_count();
            continue;
        }

        if( cmd == THREAD_MEASURE_REPORT ) {
            tmc_udn_send_1( header, UDN0_DEMUX_TAG, core_stop - core_start );
            tmc_udn_send_1( header, UDN0_DEMUX_TAG, core_work );
            tmc_udn_send_1( header, UDN0_DEMUX_TAG, core_wait );
            continue;
        }

        core_wait += t1-t0;

        // get job data
        func = (void *)cmd;

        len = tmc_udn0_receive();
        assert( len <= DATA_LEN_MAX );
        tmc_udn0_receive_buffer( data, (len+7)/8 );

        // start job
        t0 = par_get_cycle_count();
        func( data, len );
        t1 = par_get_cycle_count();

        core_work += t1-t0;

        // say we're done
        tmc_udn_send_1( header, UDN0_DEMUX_TAG, coreid );

    }

    return NULL;
}

int par_init( unsigned int cores )
{
    int rc;
    cpu_set_t cpus;

    if( cores == 0 )
        cores = tmc_cpus_grid_total()-1;

    par_cores_total = cores;
    par_cores_max   = cores;

    if( tmc_cpus_get_my_affinity( &cpus ) != 0 ) {
        printf( "Host: tmc_cpus_get_my_affinity() failed.\n" );
        return 1;
    }

    // Add 1 for host
    if( tmc_cpus_count( &cpus ) < cores+1 ) {
        printf( "Host: Insufficient cpus (have %d, need %d).\n", tmc_cpus_count(&cpus), cores+1 );
        return 1;
    }

    if( tmc_udn_init( &cpus ) < 0 ) {
        printf( "Host: tmc_udn_init() failed.\n" );
        return 1;
    }

    pthread_attr_t attr;
    rc = pthread_attr_init( &attr );
    if( rc != 0 ) {
        perror( "pthread_attr_init" );
        return rc;
    }

    // Threads are assigned from core 1 upwards
    for( uint64_t rank = 1; rank <= cores; rank++ ) {
        pthread_t thread_id;
        rc = pthread_create( &thread_id, &attr, &par_thread, (void *)rank );
        if( rc != 0 ) {
            perror( "pthread_create" );
            return rc;
        }
    }

    rc = pthread_attr_destroy( &attr );

    // Put host on core 0
    if( tmc_cpus_set_my_cpu( 0 ) != 0 ) {
        printf( "Host: tmc_cpus_set_my_cpu() failed." );
        return 1;
    }

    if( tmc_udn_activate() < 0 ) {
        printf( "Host: tmc_udn_activate() failed." );
        return 1;
    }

    // wait for initialization done
    for( unsigned int i = 0; i < cores; i++ )
        tmc_udn0_receive();

    return 0;
}

void par_measure_start( void )
{
    par_wait();

    for( unsigned int core = 1; core <= par_cores_total; core++ ) {
        DynamicHeader header = tmc_udn_header_from_cpu( core );
        tmc_udn_send_1( header, UDN0_DEMUX_TAG, THREAD_MEASURE_START );
    }

    host_wait_start = 0;
    host_wait_done  = 0;

    host_start = par_get_cycle_count();
}

void par_measure_stop( void )
{
    par_wait();

    for( unsigned int core = 1; core <= par_cores_total; core++ ) {
        DynamicHeader header = tmc_udn_header_from_cpu( core );
        tmc_udn_send_1( header, UDN0_DEMUX_TAG, THREAD_MEASURE_STOP );
    }

    host_stop = par_get_cycle_count();
}

#define PERC( x, tot ) ((float)((x)*100)/(float)(tot))

void par_measure_report( void )
{

    par_wait();

    printf( "Host wait ready: %11lu (%6.3f%%)\n", host_wait_start, PERC(host_wait_start, host_stop-host_start) );
    printf( "Host wait done:  %11lu (%6.3f%%)\n", host_wait_done,  PERC(host_wait_done,  host_stop-host_start) );

    printf( "\n" );
    printf( "Core       Total        Work                  Wait              W+W%%\n" );

    uint64_t total_time = 0;
    uint64_t total_work = 0;
    uint64_t total_wait = 0;

    for( unsigned int core = 0; core <= par_cores_total; core++ ) {
        uint64_t core_time, core_work, core_wait;

        if( core == 0 ) {
            core_time = host_stop - host_start;
            core_work = 0;
            core_wait = host_wait_start + host_wait_done;
        } else {
            DynamicHeader header = tmc_udn_header_from_cpu( core );
            tmc_udn_send_1( header, UDN0_DEMUX_TAG, THREAD_MEASURE_REPORT );
            core_time = tmc_udn0_receive();
            core_work = tmc_udn0_receive();
            core_wait = tmc_udn0_receive();
            total_time += core_time;
            total_work += core_work;
            total_wait += core_wait;
        }

        printf( "%4d %11lu %11lu (%6.3f%%) %11lu (%6.3f%%) %6.3f%%",
                core,
                core_time,
                core_work, PERC(core_work, core_time),
                core_wait, PERC(core_wait, core_time),
                PERC(core_work+core_wait,core_time)
              );

        if( core_work+core_wait > core_time ) printf( " !" );
        printf( "\n" );
    }

    uint64_t avg_time = total_time / par_cores_total;
    uint64_t avg_work = total_work / par_cores_total;
    uint64_t avg_wait = total_wait / par_cores_total;

    printf( "--------------------------------------------------------------------\n" );

    printf( "Avg  %11lu %11lu (%6.3f%%) %11lu (%6.3f%%) %6.3f%%\n",
            avg_time,
            avg_work, PERC(avg_work, avg_time),
            avg_wait, PERC(avg_wait, avg_time),
            PERC(avg_work+avg_wait,avg_time)
          );

    fflush( stdout );
}

void par_shutdown( void )
{
    par_wait();

    for( unsigned int core = 1; core <= par_cores_total; core++ ) {
        DynamicHeader header = tmc_udn_header_from_cpu( core );
        tmc_udn_send_1( header, UDN0_DEMUX_TAG, THREAD_QUIT );
    }
}

// Need this so the optimizer doesn't screw up the ordering.
uint64_t __attribute__((noinline)) par_get_cycle_count( void )
{
    return get_cycle_count();
}
