Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 51 additions & 36 deletions src/fread.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,12 @@ static void init_const_literals(void)
typedef struct FieldParseContext {
// Pointer to the current parsing location
const char **ch;

// Parse target buffers, indexed by size. A parser that reads values of byte
// size `sz` will attempt to write that value into `targets[sz]`. Thus,
// generally this is an array with elements 0, 1, 4, and 8 defined, while all
// other pointers are NULL.

void **targets;
// String "anchor" for `Field()` parser -- the difference `ch - anchor` will
// be written out as the string offset.
Expand Down Expand Up @@ -334,7 +336,7 @@ static inline int countfields(const char **pch)
{
static lenOff trash; // see comment on other trash declarations
static void *targets[9];
targets[8] = (void*) &trash;
targets[8] = (void*)&trash;
const char *ch = *pch;
if (sep == ' ') while (*ch == ' ') ch++; // multiple sep==' ' at the start does not mean sep
skip_white(&ch);
Expand Down Expand Up @@ -637,7 +639,7 @@ static void str_to_i32_core(const char **pch, int32_t *target, bool parse_date)

static void StrtoI32(FieldParseContext *ctx)
{
str_to_i32_core(ctx->ch, (int32_t*) ctx->targets[sizeof(int32_t)], false);
str_to_i32_core(ctx->ch, (int32_t*)ctx->targets[sizeof(int32_t)], false);
}

static void StrtoI64(FieldParseContext *ctx)
Expand Down Expand Up @@ -826,7 +828,7 @@ static void parse_double_regular_core(const char **pch, double *target)

static void parse_double_regular(FieldParseContext *ctx)
{
parse_double_regular_core(ctx->ch, (double*) ctx->targets[sizeof(double)]);
parse_double_regular_core(ctx->ch, (double*)ctx->targets[sizeof(double)]);
}

/**
Expand Down Expand Up @@ -1033,7 +1035,7 @@ static void parse_iso8601_date_core(const char **pch, int32_t *target)

static void parse_iso8601_date(FieldParseContext *ctx)
{
parse_iso8601_date_core(ctx->ch, (int32_t*) ctx->targets[sizeof(int32_t)]);
parse_iso8601_date_core(ctx->ch, (int32_t*)ctx->targets[sizeof(int32_t)]);
}

static void parse_iso8601_timestamp(FieldParseContext *ctx)
Expand Down Expand Up @@ -1329,8 +1331,22 @@ static int detect_types(const char **pch, int ncol, bool *bumped)
//=================================================================================================
int freadMain(freadMainArgs _args)
{
struct
{
double t0;
double map; // moment when memory-map step has finished
double layout; // Timer for assigning column names
double coltype; // Timer for applying user column class overrides
double alloc;
double read;
double reread;
double th_read;
double th_push; // reductions of timings within the parallel region
double tot;
} timestamps = { 0 };

args = _args; // assign to global for use by DTPRINT() in other functions
double t0 = wallclock();
timestamps.t0 = wallclock();

//*********************************************************************************************
// [1] Extract the arguments and check their validity
Expand Down Expand Up @@ -1438,7 +1454,7 @@ int freadMain(freadMainArgs _args)
// [2] Open and memory-map the input file, setting up the parsing context
// (sof, eof, ch).
//*********************************************************************************************
double tMap; // moment when memory-map step has finished

{
if (verbose) DTPRINT(_("[02] Opening the file\n"));
mmp = NULL;
Expand Down Expand Up @@ -1514,13 +1530,13 @@ int freadMain(freadMainArgs _args)
STOP(_("Opened %s file ok but could not memory map it. This is a %dbit process. %s."), filesize_to_str(fileSize), nbit, // # nocov
nbit <= 32 ? _("Please upgrade to 64bit") : _("There is probably not enough contiguous virtual memory available")); // # nocov
}
sof = (const char*) mmp;
sof = (const char*)mmp;
if (verbose) DTPRINT(_(" Memory mapped ok\n"));
} else {
INTERNAL_STOP("neither `input` nor `filename` are given, nothing to read"); // # nocov
}
eof = sof + fileSize;
tMap = wallclock();
timestamps.map = wallclock();
}


Expand Down Expand Up @@ -2179,7 +2195,6 @@ int freadMain(freadMainArgs _args)
// [8] Assign column names
// Updates pos(ition) to rest after the column names (if any) at the start of the first data row
//*********************************************************************************************
double tLayout; // Timer for assigning column names
const char *colNamesAnchor = pos;
{
if (verbose) DTPRINT(_("[08] Assign column names\n"));
Expand All @@ -2204,7 +2219,7 @@ int freadMain(freadMainArgs _args)
// Use Field() here as it handles quotes, leading space etc inside it
ch++;
Field(&fctx); // stores the string length and offset as <uint,uint> in colNames[i]
((lenOff**) fctx.targets)[8]++;
((lenOff**)fctx.targets)[8]++;
if (*ch != sep) break;
if (sep == ' ') {
while (ch[1] == ' ') ch++;
Expand All @@ -2217,13 +2232,12 @@ int freadMain(freadMainArgs _args)
// now on first data row (row after column names)
// when fill=TRUE and column names shorter (test 1635.2), leave calloc initialized lenOff.len==0
}
tLayout = wallclock();
timestamps.layout = wallclock();
}

//*********************************************************************************************
// [9] Apply colClasses, select, drop and integer64
//*********************************************************************************************
double tColType; // Timer for applying user column class overrides
int ndrop; // Number of columns dropped that will be dropped from the file being read
int nStringCols; // Number of string columns in the file
int nNonStringCols; // Number of all other columns in the file
Expand Down Expand Up @@ -2269,7 +2283,7 @@ int freadMain(freadMainArgs _args)
if (type[j] == CT_STRING) nStringCols++; else nNonStringCols++;
}
if (verbose) DTPRINT(_(" After %d type and %d drop user overrides : %s\n"), nUserBumped, ndrop, typesAsString(ncol));
tColType = wallclock();
timestamps.coltype = wallclock();
}

//*********************************************************************************************
Expand All @@ -2281,14 +2295,13 @@ int freadMain(freadMainArgs _args)
ncol - ndrop, ncol, ndrop, allocnrow);
}
size_t DTbytes = allocateDT(type, size, ncol, ndrop, allocnrow);
double tAlloc = wallclock();
timestamps.alloc = wallclock();

//*********************************************************************************************
// [11] Read the data
//*********************************************************************************************
bool stopTeam = false, firstTime = true, restartTeam = false; // bool for MT-safey (cannot ever read half written bool value badly)
int nTypeBump = 0, nTypeBumpCols = 0;
double tRead = 0, tReread = 0;
double thRead = 0, thPush = 0; // reductions of timings within the parallel region
int max_col = 0;
char *typeBumpMsg = NULL; size_t typeBumpMsgSize = 0;
Expand Down Expand Up @@ -2384,7 +2397,7 @@ int freadMain(freadMainArgs _args)
stopTeam = true;
}
prepareThreadContext(&ctx);

#pragma omp for ordered schedule(dynamic) reduction(+:thRead,thPush) reduction(max:max_col)
for (int jump = jump0; jump < nJumps; jump++) {
if (stopTeam) continue; // must continue and not break. We desire not to depend on (relatively new) omp cancel directive, yet
Expand Down Expand Up @@ -2412,7 +2425,7 @@ int freadMain(freadMainArgs _args)
// Important for thread safety inside progress() that this is called not just from critical but that
// it's the master thread too, hence me==0. OpenMP doesn't allow '#pragma omp master' here, but we
// did check above that master's me==0.
int ETA = (int)(((now - tAlloc) / jump) * (nJumps - jump));
int ETA = (int)(((now - timestamps.alloc) / jump) * (nJumps - jump));
progress((int)(100.0 * jump / nJumps), ETA);
}
}
Expand Down Expand Up @@ -2462,7 +2475,7 @@ int freadMain(freadMainArgs _args)
fun[IGNORE_BUMP(thisType)](&fctx);
if (*tch != sep) break;
int8_t thisSize = size[j];
if (thisSize) ((char**) targets)[thisSize] += thisSize; // 'if' for when rereading to avoid undefined NULL+0
if (thisSize) ((char**)targets)[thisSize] += thisSize; // 'if' for when rereading to avoid undefined NULL+0
tch++;
j++;
}
Expand All @@ -2476,7 +2489,7 @@ int freadMain(freadMainArgs _args)
}
else if (eol(&tch) && j < ncol) { // j<ncol needed for #2523 (erroneous extra comma after last field)
int8_t thisSize = size[j];
if (thisSize) ((char**) targets)[thisSize] += thisSize;
if (thisSize) ((char**)targets)[thisSize] += thisSize;
j++;
if (j > max_col) max_col = j;
if (j == ncol) { tch++; myNrow++; continue; } // next line. Back up to while (tch<nextJumpStart). Usually happens, fastest path
Expand Down Expand Up @@ -2593,7 +2606,7 @@ int freadMain(freadMainArgs _args)
}
}
int8_t thisSize = size[j];
if (thisSize) ((char**) targets)[size[j]] += size[j]; // 'if' to avoid undefined NULL+=0 when rereading
if (thisSize) ((char**)targets)[size[j]] += size[j]; // 'if' to avoid undefined NULL+=0 when rereading
j++;
if (*tch == sep) { tch++; continue; }
if (fill && (*tch == '\n' || *tch == '\r' || tch == eof) && j < ncol) continue; // reuse processors to write appropriate NA to target; saves maintenance of a type switch down here
Expand Down Expand Up @@ -2672,6 +2685,7 @@ int freadMain(freadMainArgs _args)
// Next thread can now start her ordered section and write her results to the final DT at the same time as me.
// Ordered has to be last in some OpenMP implementations currently. Logically though, pushBuffer happens now.
}

// End for loop over all jump points

// Push out all buffers one last time (only needed because of gomp ordered workaround above with push first in the loop)
Expand Down Expand Up @@ -2737,7 +2751,7 @@ int freadMain(freadMainArgs _args)
if (args.showProgress) progress(100, 0);

if (firstTime) {
tReread = tRead = wallclock();
timestamps.reread = timestamps.read = wallclock();

// if nTypeBump>0, not-bumped columns are about to be assigned parse type TOGGLE_BUMP(CT_STRING) for the reread, so we have to count
// parse types now (for log). We can't count final column types afterwards because many parse types map to the same column type.
Expand Down Expand Up @@ -2776,14 +2790,15 @@ int freadMain(freadMainArgs _args)
continue;
}
} else {
tReread = wallclock();
timestamps.reread = wallclock();
}

break;
}
double tTot = tReread - t0; // tReread==tRead when there was no reread

timestamps.tot = timestamps.reread - timestamps.t0; // timestamps.reread == timestamps.read when there was no reread
if (verbose) DTPRINT(_("Read %"PRIu64" rows x %d columns from %s file in %02d:%06.3f wall clock time\n"),
(uint64_t)DTi, ncol - ndrop, filesize_to_str(fileSize), (int)tTot / 60, fmod(tTot, 60.0));
(uint64_t)DTi, ncol - ndrop, filesize_to_str(fileSize), (int)timestamps.tot / 60, fmod(timestamps.tot, 60.0));

//*********************************************************************************************
// [12] Finalize the datatable
Expand Down Expand Up @@ -2829,25 +2844,25 @@ int freadMain(freadMainArgs _args)

if (verbose) {
DTPRINT("=============================\n"); // # notranslate
if (tTot < 0.000001) tTot = 0.000001; // to avoid nan% output in some trivially small tests where tot==0.000s
DTPRINT(_("%8.3fs (%3.0f%%) Memory map %.3fGiB file\n"), tMap - t0, 100.0 * (tMap - t0) / tTot, 1.0 * fileSize / (1024 * 1024 * 1024));
DTPRINT(_("%8.3fs (%3.0f%%) sep="), tLayout - tMap, 100.0 * (tLayout - tMap) / tTot);
if (timestamps.tot < 0.000001) timestamps.tot = 0.000001; // to avoid nan% output in some trivially small tests where tot==0.000s
DTPRINT(_("%8.3fs (%3.0f%%) Memory map %.3fGiB file\n"), timestamps.map - timestamps.t0, 100.0 * (timestamps.map - timestamps.t0) / timestamps.tot, 1.0 * fileSize / (1024 * 1024 * 1024));
DTPRINT(_("%8.3fs (%3.0f%%) sep="), timestamps.layout - timestamps.map, 100.0 * (timestamps.layout - timestamps.map) / timestamps.tot);
DTPRINT(sep == '\t' ? "'\\t'" : (sep == '\n' ? "'\\n'" : "'%c'"), sep); // # notranslate
DTPRINT(_(" ncol=%d and header detection\n"), ncol);
DTPRINT(_("%8.3fs (%3.0f%%) Column type detection using %"PRId64" sample rows\n"),
tColType - tLayout, 100.0 * (tColType - tLayout) / tTot, sampleLines);
timestamps.coltype - timestamps.layout, 100.0 * (timestamps.coltype - timestamps.layout) / timestamps.tot, sampleLines);
DTPRINT(_("%8.3fs (%3.0f%%) Allocation of %"PRId64" rows x %d cols (%.3fGiB) of which %"PRId64" (%3.0f%%) rows used\n"),
tAlloc - tColType, 100.0 * (tAlloc - tColType) / tTot, allocnrow, ncol, DTbytes / (1024.0 * 1024 * 1024), DTi, 100.0 * DTi / allocnrow);
timestamps.alloc - timestamps.coltype, 100.0 * (timestamps.alloc - timestamps.coltype) / timestamps.tot, allocnrow, ncol, DTbytes / (1024.0 * 1024 * 1024), DTi, 100.0 * DTi / allocnrow);
thRead /= nth; thPush /= nth;
double thWaiting = tReread - tAlloc - thRead - thPush;
double thWaiting = timestamps.reread - timestamps.alloc - thRead - thPush;
DTPRINT(_("%8.3fs (%3.0f%%) Reading %d chunks (%d swept) of %.3fMiB (each chunk %"PRId64" rows) using %d threads\n"),
tReread - tAlloc, 100.0 * (tReread - tAlloc) / tTot, nJumps, nSwept, (double)chunkBytes / (1024 * 1024), DTi / nJumps, nth);
DTPRINT(_(" + %8.3fs (%3.0f%%) Parse to row-major thread buffers (grown %d times)\n"), thRead, 100.0 * thRead / tTot, buffGrown);
DTPRINT(_(" + %8.3fs (%3.0f%%) Transpose\n"), thPush, 100.0 * thPush / tTot);
DTPRINT(_(" + %8.3fs (%3.0f%%) Waiting\n"), thWaiting, 100.0 * thWaiting / tTot);
timestamps.reread - timestamps.alloc, 100.0 * (timestamps.reread - timestamps.alloc) / timestamps.tot, nJumps, nSwept, (double)chunkBytes / (1024 * 1024), DTi / nJumps, nth);
DTPRINT(_(" + %8.3fs (%3.0f%%) Parse to row-major thread buffers (grown %d times)\n"), thRead, 100.0 * thRead / timestamps.tot, buffGrown);
DTPRINT(_(" + %8.3fs (%3.0f%%) Transpose\n"), thPush, 100.0 * thPush / timestamps.tot);
DTPRINT(_(" + %8.3fs (%3.0f%%) Waiting\n"), thWaiting, 100.0 * thWaiting / timestamps.tot);
DTPRINT(_("%8.3fs (%3.0f%%) Rereading %d columns due to out-of-sample type exceptions\n"),
tReread - tRead, 100.0 * (tReread - tRead) / tTot, nTypeBumpCols);
DTPRINT(_("%8.3fs Total\n"), tTot);
timestamps.reread - timestamps.read, 100.0 * (timestamps.reread - timestamps.read) / timestamps.tot, nTypeBumpCols);
DTPRINT(_("%8.3fs Total\n"), timestamps.tot);
if (typeBumpMsg) {
// if type bumps happened, it's useful to see them at the end after the timing 2 lines up showing the reread time
// TODO - construct and output the copy and pastable colClasses argument so user can avoid the reread time if they are
Expand Down
Loading