Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
0c7404b
stopping point
stevenqie Apr 30, 2025
d830a5d
resolved linker errors
stevenqie May 1, 2025
a82ddbe
Merge branch 'main' into sq-taskqueue
stevenqie May 1, 2025
40421c2
Merge branch 'main' into sq-taskqueue
stevenqie May 4, 2025
9214c34
consildate everything into just converse_itnernal and convcore
stevenqie May 4, 2025
4439108
print statements
stevenqie May 5, 2025
557801e
Merge branch 'main' into sq-taskqueue
stevenqie May 5, 2025
0fd0f7b
working with other examples now
stevenqie May 5, 2025
9c9f4c0
general layout
stevenqie May 5, 2025
30a5714
working example but not if you define k really big
stevenqie May 6, 2025
fd2b68f
undefined handler functionality + clean up comments
stevenqie May 6, 2025
34443d7
Merge branch 'main' into sq-taskqueue
stevenqie May 6, 2025
ca8e293
reordering
stevenqie May 6, 2025
1b94c90
try again
stevenqie May 6, 2025
feafc33
testing to see if build test passes now
stevenqie May 8, 2025
8fee676
try again
stevenqie May 8, 2025
4cd5fd7
remove lci1 test
ritvikrao May 8, 2025
ad5c110
example bug where stuff is hanging fixed
stevenqie May 8, 2025
48b7912
Merge branch 'sq-taskqueue' of https://github.com/charmplusplus/recon…
stevenqie May 8, 2025
51c6264
null check + start of CmiSynctaskQSendndFree
stevenqie May 8, 2025
34456fe
remove print statement + change to sendandfree
stevenqie May 8, 2025
ef9109d
task queue steal cange
stevenqie May 8, 2025
2a33291
change value of k
stevenqie May 8, 2025
3f4032b
add head check
stevenqie May 8, 2025
e806701
add task queu logic to csdschdulerpoll
stevenqie May 8, 2025
e510f1d
print statement remove
stevenqie May 8, 2025
c8c9fd1
merge
ritvikrao Jun 30, 2025
9babb95
newline
ritvikrao Jun 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ add_subdirectory(reduction_node)
add_subdirectory(cth)
add_subdirectory(self_send)
add_subdirectory(ctv)
add_subdirectory(taskqueue)
1 change: 0 additions & 1 deletion examples/reduction_dual/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
add_reconverse_executable(reduction_dual reduction_dual.cpp)
add_test(NAME reduction_dual COMMAND reduction_dual +pe 7)
add_test(NAME reduction_dual-multinode COMMAND ${RECONVERSE_TEST_LAUNCHER} -n 2 $<TARGET_FILE:reduction_dual> +pe 4)
1 change: 1 addition & 0 deletions examples/taskqueue/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
add_reconverse_executable(taskqueue taskqueue.cpp)
23 changes: 23 additions & 0 deletions examples/taskqueue/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Compiler and flags
CXX := g++
CXXFLAGS := -std=c++11 -pthread -I ../..

# Source files
SRCS := taskqueue.cpp ../../convcore.cpp ../../scheduler.cpp ../../queue.cpp ../../conv-conds.cpp
HDRS := ../../converse_internal.h ../../scheduler.h ../../queue.h ../../barrier.h

# Output executable
TARGET := test

# Default target
all: $(TARGET)

# Link the object files to create the executable
$(TARGET): $(SRCS)
$(CXX) -o $@ $^ $(CXXFLAGS)

# Clean up build files
clean:
rm -f $(TARGET)

.PHONY: all clean
69 changes: 69 additions & 0 deletions examples/taskqueue/taskqueue.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#include "converse.h"
#include <pthread.h>
#include <stdio.h>
#include <atomic>

#define K 2000 // will tend to abort if your k value is too high (higher than the max task queue size)
#define X 10000

thread_local int tasksExecuted = 0;
static std::atomic<int> globalCounter{K};

int handlerID;
int print_handlerID;

struct Message {
CmiMessageHeader header;
int data[1];
};

void print_results_handler_func(void* vmsg) {
printf("PE %d executed %d tasks.\n", CmiMyRank(), tasksExecuted);
CmiNodeBarrier();
if (CmiMyRank() == 0) {
CmiExit(0);
}
}

void handler_func(void *vmsg) {
Message* incoming_msg = (Message*)vmsg;
//printf("PE %d pinged this function with data index: %d.\n", CmiMyRank(), incoming_msg->data[0]);

// do dummy work
for (int i = 0; i < X; i++) {
CmiWallTimer();
}

tasksExecuted++;

int prev = globalCounter.fetch_sub(1, std::memory_order_acq_rel);
//CmiPrintf("Current globalCounter: %d\n", prev - 1);
if (prev == 1) {
Message* msg = new Message;
msg->header.handlerId = print_handlerID;
msg->header.messageSize = sizeof(Message);
printf("All tasks have been executed.\n");
CmiSyncBroadcastAllAndFree(sizeof(Message), msg);
}
}

CmiStartFn mymain(int argc, char **argv) {
handlerID = CmiRegisterHandler(handler_func);
print_handlerID = CmiRegisterHandler(print_results_handler_func);

if (CmiMyPe() == 0) {
for (int i = 0; i < K; i++) {
Message* newmsg = new Message;
newmsg->data[0] = i;
newmsg->header.messageSize = sizeof(Message);
newmsg->header.handlerId = handlerID;
CmiSyncTaskQSendAndFree(CmiMyRank(), sizeof(Message), newmsg);
}
}
return 0;
}

int main(int argc, char **argv) {
ConverseInit(argc, argv, (CmiStartFn)mymain);
return 0;
}
3 changes: 3 additions & 0 deletions include/converse.h
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,9 @@ void CmiFreeSendFn(int destPE, int messageSize, char *msg);
void CmiSyncListSendFn(int npes, const int *pes, int len, char *msg);
void CmiFreeListSendFn(int npes, const int *pes, int len, char *msg);

void CmiSyncTaskQSend(int destPE, int messageSize, void *msg);
void CmiSyncTaskQSendAndFree(int destPE, int messageSize, void *msg);

// broadcasts
void CmiSyncBroadcast(int size, void *msg);
void CmiSyncBroadcastAndFree(int size, void *msg);
Expand Down
184 changes: 177 additions & 7 deletions src/convcore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ int _replaySystem = 0;
void CldModuleInit(char **);

// PE LOCALS that need global access sometimes
static ConverseQueue<void *> **Cmi_queues; // array of queue pointers
ConverseQueue<void *> **Cmi_queues = nullptr; // array of queue pointers
TaskQueue** Cmi_taskqueues = nullptr;

// PE LOCALS
thread_local int Cmi_myrank;
Expand All @@ -58,6 +59,7 @@ thread_local double idle_time;

// Special operation handlers (TODO: should these be special values instead like
// the exit handler)
int Cmi_undefinedHandler;
int Cmi_exitHandler;

// TODO: padding for all these thread_locals and cmistates?
Expand Down Expand Up @@ -87,15 +89,20 @@ void converseRunPe(int rank) {
// init state
CmiInitState(rank);

// init things like cld module, ccs, etc
CldModuleInit(Cmi_argv);

#ifdef SET_CPU_AFFINITY
CmiSetCPUAffinity(rank);
#endif

Cmi_undefinedHandler = CmiRegisterHandler(CmiUndefinedHandler); //should be at index 0 which is what gets called if user doesnt define a handlerid in their message
Cmi_exitHandler = CmiRegisterHandler(CmiExitHandler);

//initalize collective operations/arrays/handlers/etc
collectiveInit();
// Cmi_multicastHandler = CmiRegisterHandler(CmiMulticastHandler);

// init things like cld module, ccs, etc
// moved this after Cmi_Exithandler so exithandler will theoretically be
//registered at index 0, so if user forgets to specify handlerid, it will exit
CldModuleInit(Cmi_argv);

// barrier to ensure all global structs are initialized
CmiNodeBarrier();
Expand All @@ -105,17 +112,24 @@ void converseRunPe(int rank) {

// call initial function and start scheduler
Cmi_startfn(Cmi_argc, Cmi_argv);

//printf("%d: taskqueue pointer: %p\n", __LINE__, CsvAccess(task_q)[Cmi_myrank]);
CsdScheduler();

// cleanup of threads? : destroy each threads task queue and reduction table struct(not the struct itself)
TaskQueueDestroy(Cmi_taskqueues[Cmi_myrank]);
//free(CpvAccess(_reduction_info));
}

void CmiStartThreads() {
// allocate global arrayss
Cmi_queues = new ConverseQueue<void *> *[Cmi_mynodesize];
CmiHandlerTable = new std::vector<CmiHandlerInfo> *[Cmi_mynodesize];
CmiNodeQueue = new ConverseNodeQueue<void *>();
Cmi_taskqueues = new TaskQueue*[Cmi_mynodesize];

_smp_mutex = CmiCreateLock();

// make sure the queues are allocated before PEs start sending messages around
comm_backend::barrier();

Expand All @@ -135,9 +149,12 @@ void CmiStartThreads() {
delete[] Cmi_queues;
delete CmiNodeQueue;
delete[] CmiHandlerTable;
delete [] Cmi_taskqueues;

Cmi_queues = nullptr;
CmiNodeQueue = nullptr;
CmiHandlerTable = nullptr;
Cmi_taskqueues = nullptr;
}

// argument form: ./prog +pe <N>
Expand Down Expand Up @@ -212,10 +229,16 @@ void CmiInitState(int rank) {
// allocate global entries
ConverseQueue<void *> *queue = new ConverseQueue<void *>();
std::vector<CmiHandlerInfo> *handlerTable = new std::vector<CmiHandlerInfo>();

Cmi_queues[Cmi_myrank] = queue;
CmiHandlerTable[Cmi_myrank] = handlerTable;

//task queue stuff
Cmi_taskqueues[Cmi_myrank] = TaskQueueCreate();
if(CmiMyNodeSize() > 1) {
CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE, (CcdCondFn) TaskStealBeginIdle, NULL);
CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE, (CcdCondFn) TaskStealBeginIdle, NULL);
}

// random
CrnInit();
CpvInitialize(std::vector<NcpyOperationInfo *>,
Expand Down Expand Up @@ -288,6 +311,8 @@ void CmiNumberHandlerEx(int n, CmiHandlerEx h, void *userPtr) {

void CmiPushPE(int destPE, int messageSize, void *msg) {
int rank = CmiRankOf(destPE);
// printf("in cmipushpe: myPe: %d, destPe: %d, nodeSize: %d\n", CmiMyPe(), destPE,
// Cmi_mynodesize);
CmiAssertMsg(
rank >= 0 && rank < Cmi_mynodesize,
"CmiPushPE(myPe: %d, destPe: %d, nodeSize: %d): rank out of range",
Expand Down Expand Up @@ -354,6 +379,7 @@ void CmiSyncSendAndFree(int destPE, int messageSize, void *msg) {
}

if (CmiMyNode() == destNode) {

CmiPushPE(destPE, messageSize, msg);
} else {
comm_backend::issueAm(destNode, msg, messageSize, comm_backend::MR_NULL,
Expand Down Expand Up @@ -427,6 +453,14 @@ void CmiExitHandler(void *msg) {
CsdExitScheduler();
}

// this will only stop the program on pe1, so it will exit the program if its +pe 1 and hang if it is +pe > 1
void CmiUndefinedHandler(void* msg) {
(void)msg;

CmiPrintf("Possible undefined handler called. Please check to see if you've populated the handlerId field of your message.\n");
CsdExitScheduler();
}

ConverseNodeQueue<void *> *CmiGetNodeQueue() { return CmiNodeQueue; }

void CmiSyncNodeSendAndFree(unsigned int destNode, unsigned int size,
Expand Down Expand Up @@ -974,3 +1008,139 @@ void StopInteropScheduler()
{
CpvAccess(interopExitFlag) = 1;
}

//Task Queue Functions/Definitions
void CmiSyncTaskQSend(int destPE, int messageSize, void *msg) {
char *copymsg = (char *)CmiAlloc(messageSize);
std::memcpy(copymsg, msg,
messageSize); // optionally avoid memcpy and block instead

CmiSyncTaskQSendAndFree(destPE, messageSize, copymsg);
}

void CmiSyncTaskQSendAndFree(int destPE, int messageSize, void *msg) {
int destindex = CmiRankOf(destPE);
TaskQueue* dest_taskq = (TaskQueue*)(Cmi_taskqueues[destindex]);
if (dest_taskq == NULL) {
CmiFree(msg);
return;
}

TaskQueuePush(dest_taskq, msg);
return;
}

// Function to create a new TaskQueue and initialize its members
TaskQueue* TaskQueueCreate() {
TaskQueue* taskqueue = (TaskQueue*)malloc(sizeof(TaskQueue));
taskqueue->head = 0;
taskqueue->tail = 0;
for (int i = 0; i < TASKQUEUE_SIZE; i++) {
taskqueue->data[i] = NULL;
}
return taskqueue;
}

// Function to push a task onto the TaskQueue
void TaskQueuePush(TaskQueue* queue, void* data) {
// compute estimated size
//probably not thread safe, so that's why its estimated size?
taskq_idx head = queue->head;
taskq_idx tail = queue->tail;
size_t estimated_size = std::abs(tail - head);
if (estimated_size > TASKQUEUE_SIZE - (2 * CmiMyNodeSize())) {
CmiPrintf("tail: %lu, head: %lu\n", tail, head);
CmiAbort("TaskQueuePush: TaskQueue is approaching full. Estimated size: %lu, threshold: %lu \n", estimated_size, TASKQUEUE_SIZE - (2 * CmiMyNodeSize()));
}

queue->data[queue->tail % TASKQUEUE_SIZE] = data;
CmiMemoryWriteFence(); //makes sure the data is fully written before updating the tail pointer
queue->tail++;
}

// Function to pop a task from the TaskQueue. Victims pop from the tail
void* TaskQueuePop(TaskQueue* queue) {
queue->tail = queue->tail - 1;
CmiMemoryWriteFence();
taskq_idx head = queue->head;
taskq_idx tail = queue->tail;
if (tail > head) { // there are more than two tasks in the queue, so it is safe to pop a task from the queue.
return queue->data[tail % TASKQUEUE_SIZE];
}

if (tail < head) { // The taskqueue is empty and the last task has been stolen by a thief.
queue->tail = head; // reset the tail pointer to the head pointer
return NULL;
}

// head==tail case: there is only one task so thieves and victim can try to obtain this task simultaneously.
queue->tail = head + 1;
if (!__sync_bool_compare_and_swap(&(queue->head), head, head+1)) { // Check whether the last task has already stolen.
return NULL;
}
return queue->data[tail % TASKQUEUE_SIZE];
}

// Function to steal a task from another TaskQueue. Other PEs/Threads steal from the head
void* TaskQueueSteal(TaskQueue* queue) {
if (queue == NULL) {
return NULL;
}

taskq_idx head, tail;
while (1) {
head = queue->head;
tail = queue->tail;
if (head >= tail) {
// The queue is empty
// or the last element has been stolen by other thieves
// or popped by the victim.
return NULL;
}

if (!__sync_bool_compare_and_swap(&(queue->head), head, head+1)) { // Check whether the task this thief is trying to steal is still in the queue and not stolen by the other thieves.
continue;
}
if (head < 0) {
return NULL;
}
return queue->data[head % TASKQUEUE_SIZE];
}
}

// Function to destroy the TaskQueue and free its memory
void TaskQueueDestroy(TaskQueue* queue) {
if (queue != NULL) {
free(queue);
queue = NULL;
}
}

void StealTask() {
// start up timer if trace is enabled
//steal from a random PE on the same node
int random_rank = CrnRand() % (CmiMyNodeSize()-1);
if (random_rank == CmiMyRank()) {
random_rank++;
if (random_rank >= CmiMyNodeSize()) {
random_rank = 0; // wrap around if our random_selected node is the same as our own
// and we are the last PE on our node
}
}
if (Cmi_taskqueues[random_rank] != NULL) {
void* msg = TaskQueueSteal((TaskQueue*)(Cmi_taskqueues[random_rank]));
if (msg != NULL && Cmi_taskqueues[Cmi_myrank] != NULL) {
TaskQueuePush((TaskQueue*)(Cmi_taskqueues[Cmi_myrank]), msg);
}
}
}

// this function is passed into CcdCallOnConditionKeep
void TaskStealBeginIdle() {
// can discuss whether we need to add the isHelper csv variable that is in old converse.
// not going to add it for now, because it's turned/left on by default in old converse
if (CmiMyNodeSize() > 1) {
StealTask();
}
}

Loading
Loading