From 83b07c3a625211348a003db06322f09b197c0038 Mon Sep 17 00:00:00 2001 From: Sujith Kakarlapudi Date: Sat, 17 May 2025 18:48:09 -0700 Subject: [PATCH] Log-based impression storage, computeStatistics improvements, and ArcTool query support --- src/configs/Config.php | 6 + src/executables/ArcTool.php | 297 ++++++++++++++++++++++++++++++++- src/models/ImpressionModel.php | 202 ++++++++++++++++++---- 3 files changed, 470 insertions(+), 35 deletions(-) diff --git a/src/configs/Config.php b/src/configs/Config.php index 80b9324cb..215b6d37d 100755 --- a/src/configs/Config.php +++ b/src/configs/Config.php @@ -504,6 +504,12 @@ if (file_exists(WORK_DIRECTORY . PROFILE_FILE_NAME)) { * and certain bloom filter */ nsconddefine('DATA_DIR', WORK_DIRECTORY . "/data"); + /** + * Directory used to store user and item impression logs. + * These logs back the PartitionDocumentBundle for recording + * and retrieving impression records. + */ + nsconddefine('IMPRESSION_LOG_DIR', LOG_DIR . "/ImpressionRecords"); } } else { if ((!isset( $_SERVER['SERVER_NAME']) || diff --git a/src/executables/ArcTool.php b/src/executables/ArcTool.php index 1889a19e6..9d45e6c3a 100755 --- a/src/executables/ArcTool.php +++ b/src/executables/ArcTool.php @@ -46,6 +46,8 @@ use seekquarry\yioop\library\UrlParser; use seekquarry\yioop\library\WebArchiveBundle; use seekquarry\yioop\library\media_jobs\FeedsUpdateJob; use seekquarry\yioop\controllers\AdminController; +use seekquarry\yioop\models as M; +use seekquarry\yioop\models\ImpressionModel; if (php_sapi_name() != 'cli' || defined("seekquarry\\yioop\\configs\\IS_OWN_WEB_SERVER")) { @@ -57,6 +59,7 @@ $_SERVER["LOG_TO_FILES"] = false; $_SERVER["USE_CACHE"] = false; /** For crawlHash, crawlHashWord function */ require_once __DIR__."/../library/Utility.php"; +require_once C\BASE_DIR . '/configs/Config.php'; if (!C\PROFILE) { echo "Please configure the search engine instance by visiting" . "its web interface on localhost.\n"; @@ -105,10 +108,21 @@ class ArcTool extends DictionaryUpdater implements CrawlConstants public function start() { global $argv; - if (!isset($argv[1]) || (!isset($argv[2]) && $argv[1] != "list") || - (!isset($argv[3]) && in_array($argv[1], - [ "dict", "inject", "make-filter"] ) || ( - !isset($argv[4]) && $argv[1] == "doc-lookup")) ) { + if ( + !isset($argv[1]) + || ( + !isset($argv[2]) + && !in_array($argv[1], ['list', 'user-impressions'], true) + ) + || ( + in_array($argv[1], ['dict', 'inject', 'make-filter'], true) + && !isset($argv[3]) + ) + || ( + $argv[1] === 'doc-lookup' + && !isset($argv[4]) + ) + ) { $this->usageMessageAndExit(); } if (!in_array($argv[1], [ "check-filter", "make-filter", "list"])) { @@ -206,6 +220,39 @@ class ArcTool extends DictionaryUpdater implements CrawlConstants $argv[4] ??= 1; $this->outputShowPages($path, $argv[3], $argv[4]); break; + case "user-impressions": + $key = isset($argv[2]) ? (string)$argv[2] : null; + $partition = isset($argv[3]) ? (int)$argv[3] : null; + $limit = isset($argv[4]) ? (int)$argv[4] : null; + $this->displayLogRecords($key, $partition, $limit); + break; + case "item-impressions": + $itemId = isset($argv[2]) ? (int)$argv[2] : null; + $partition = isset($argv[3]) ? (int)$argv[3] : null; + $limit = isset($argv[4]) ? (int)$argv[4] : null; + if ($itemId === null) { + $this->usageMessageAndExit(); + } + $this->displayLogRecordsByItemId($itemId, $partition, $limit); + break; + case "user-items": + if (!isset($argv[2]) || !is_numeric($argv[2])) { + $this->usageMessageAndExit(); + } + $this->getItemIdByUser((int)$argv[2]); + break; + case "group-items": + if (!isset($argv[2]) || !is_numeric($argv[2])) { + $this->usageMessageAndExit(); + } + $this->getItemIdByGroup((int)$argv[2]); + break; + case "type-items": + if (!isset($argv[2])) { + $this->usageMessageAndExit(); + } + $this->getItemIdByType($argv[2]); + break; default: $this->usageMessageAndExit(); } @@ -1575,6 +1622,219 @@ EOD; $db->setWorldPermissionsRecursive($this->tmp_results); return $iterator; } + /** + * Retrieves the log data from the ImpressionRecords folder. + * + * @param string $key Optional primary key (user_id) to filter records by. + * @param int $partition Optional partition number; defaults to the + * current SAVE_PARTITION. + * @param int $limit Optional maximum number of records to display; + * defaults to all records. + * @return void + */ + public function displayLogRecords($key = null, $partition = null, + $limit = null) + { + $folder = C\IMPRESSION_LOG_DIR; + $format = [ + "PRIMARY KEY" => "user_id", + "item_id" => "INT", + "type_id" => "INT", + "timestamp" => "INT" + ]; + $pdb = new PartitionDocumentBundle($folder, $format); + $max_partition = $pdb->parameters['SAVE_PARTITION']; + $partitions = $partition === null + ? range(0, $max_partition) + : [ (int)$partition ]; + + foreach ($partitions as $part) { + $index = $pdb->loadPartitionIndex( + $part, + false, + PackedTableTools::APPEND_MODE + ); + $keys_to_process = $key === null + ? array_keys($index) + : [ (string)$key ]; + foreach ($keys_to_process as $user_id) { + if (! isset($index[$user_id])) { + continue; + } + $packed_row = $index[$user_id]; + $rows = $limit === null + ? $pdb->table_tools->unpack($packed_row) + : $pdb->table_tools->unpack($packed_row, 0, $limit); + foreach ($rows as $record) { + echo "Key: {$user_id}\n"; + print_r($record); + echo "-------------------------\n"; + } + } + } + } + /** + * Like displayLogRecords(), but filters by item_id instead of user_id. + * + * @param int $itemId The item ID to filter on (required) + * @param int $partition Optional partition index + * @param int $limit Optional max number of records + */ + public function displayLogRecordsByItemId($item_id, $partition = null, + $limit = null) + { + if (!is_numeric($item_id)) { + echo "Invalid item_id\n"; + return; + } + $folder = C\IMPRESSION_LOG_DIR; + $format = [ + "PRIMARY KEY" => "user_id", + "item_id" => "INT", + "type_id" => "INT", + "timestamp" => "INT" + ]; + $pdb = new PartitionDocumentBundle($folder, $format); + $max_partition = $pdb->parameters['SAVE_PARTITION']; + $partitions = $partition !== null + ? [(int)$partition] + : range(0, $max_partition); + $matched = []; + foreach ($partitions as $part) { + $index = $pdb->loadPartitionIndex($part, false, + PackedTableTools::APPEND_MODE); + foreach ($index as $key => $packed_row) { + $rows = $pdb->table_tools->unpack($packed_row); + foreach ($rows as $record) { + if ($record['item_id'] == $item_id) { + $matched[] = ['key' => $key, 'record' => $record]; + if ($limit !== null && count($matched) >= $limit) { + break 3; + } + } + } + } + } + if (empty($matched)) { + echo "No records found for item_id: $item_id\n"; + return; + } + foreach ($matched as $entry) { + echo "Key: {$entry['key']}\n"; + print_r($entry['record']); + echo "-------------------------\n"; + } + } + /** + * Retrieve and display all item IDs associated with a given user. + * + * This method queries the ITEM_IMPRESSION_SUMMARY table for entries + * matching the specified USER_ID. Each matching record’s USER_ID column is + * printed on its own line to stdout. + * + * @param int $user_id The user identifier to filter GROUP_ITEM records by. + */ + public function getItemIdByUser(int $user_id) + { + $model = new ImpressionModel(); + $db = $model->db; + $dbinfo = [ + "DBMS" => C\DBMS, + "DB_HOST" => C\DB_HOST, + "DB_USER" => C\DB_USER, + "DB_PASSWORD"=> C\DB_PASSWORD, + "DB_NAME" => C\DB_NAME + ]; + $sql = "SELECT ITEM_ID FROM ITEM_IMPRESSION_SUMMARY WHERE USER_ID = ?"; + $res = $db->execute($sql, [$user_id]); + while ($row = $db->fetchArray($res)) { + echo $row['ITEM_ID'], "\n"; + } + } + /** + * Retrieve and display all item IDs belonging to a specific group. + * + * This method queries the GROUP_ITEM table for entries matching the + * specified GROUP_ID. Each resulting record’s ID column is printed on its + * own line. + * + * @param int $group_id The group identifier to filter GROUP_ITEM records + * by. + */ + public function getItemIdByGroup(int $group_id) + { + $model = new ImpressionModel(); + $db = $model->db; + $dbinfo = [ + "DBMS" => C\DBMS, + "DB_HOST" => C\DB_HOST, + "DB_USER" => C\DB_USER, + "DB_PASSWORD"=> C\DB_PASSWORD, + "DB_NAME" => C\DB_NAME + ]; + $sql = "SELECT ID FROM GROUP_ITEM WHERE GROUP_ID = ?"; + $res = $db->execute($sql, [$group_id]); + while ($row = $db->fetchArray($res)) { + echo $row['ID'], "\n"; + } + } + /** + * Retrieve and display all item IDs of a specific type. + * + * Accepts either a human-readable type name (thread, wiki, group, + * query, cache, resource) or the corresponding integer code. Looks + * up the integer code via an internal map, then queries + * ITEM_IMPRESSION_SUMMARY for rows matching that type and prints each + * ITEM_ID on its own line. If no records match, outputs + * "No Items of that type". + * + * @param string|int $item_type The type name or numeric code to use. + * Valid names: thread, wiki, group, query, cache, resource. + * Numeric codes: 1 (thread), 2 (wiki), 3 (group), 4 (query), 5 (cache), + * 6 (resource). + */ + public function getItemIdByType($item_type) + { + $type_map = [ + 'thread' => 1, + 'wiki' => 2, + 'group' => 3, + 'query' => 4, + 'cache' => 5, + 'resource' => 6 + ]; + if (is_string($item_type)) { + $key = strtolower($item_type); + if (!isset($type_map[$key])) { + $valid = implode(', ', array_keys($type_map)); + echo "Unknown item type '{$item_type}'. Valid types are: " + . "{$valid}\n"; + return; + } + $type_id = $type_map[$key]; + } else { + $type_id = (int)$item_type; + } + $model = new ImpressionModel(); + $db = $model->db; + $dbinfo = [ + "DBMS" => C\DBMS, + "DB_HOST" => C\DB_HOST, + "DB_USER" => C\DB_USER, + "DB_PASSWORD"=> C\DB_PASSWORD, + "DB_NAME" => C\DB_NAME + ]; + $sql = "SELECT ITEM_ID FROM ITEM_IMPRESSION_SUMMARY WHERE ITEM_TYPE = ?"; + $res = $db->execute($sql, [$type_id]); + $row = $db->fetchArray($res); + if (!$row) { + echo "No Items of that type\n"; + } else { + do { + echo $row['ITEM_ID'], "\n"; + } while ($row = $db->fetchArray($res)); + } + } /** * Outputs the "hey, this isn't a known bundle message" and then exit()'s. * @@ -1726,6 +1986,35 @@ php ArcTool.php show double_index_name which_bundle start num /* outputs items start through num from bundle_name or name of Yioop or non-Yioop archive crawl folder */ +php ArcTool.php user-impressions [key] [partition] [limit] + /* displays impression log records from the ImpressionRecords log folder + for a given user_id (key). If key is omitted, records for all users + are returned. Optionally, you can specify which partition to read from + and how many records to display. If no partition or limit is given, + it will scan all partitions and return all matching records. + */ + +php ArcTool.php item-impressions item_id [partition] [limit] + /* displays impression log records from the ImpressionRecords log folder + for a given item_id. Optionally, you can specify + which partition to read from and how many records to display. + If no partition or limit is given, it will use the latest partition and + display all available records for that item_id. + */ + +php ArcTool.php user-items user_id + /* List all item IDs for the specified user. */ + +php ArcTool.php group-items group_id + /* List all item IDs for the specified group. */ + +php ArcTool.php type-items item_type + /* + * Lists all ITEM_IDs matching the specified type. The type may be + * given as a name (thread, wiki, group, query, cache, resource) + * or as its numeric code (1–6). Outputs one ID per line, or + * "No Items of that type" if none are found. + */ EOD; exit(); } diff --git a/src/models/ImpressionModel.php b/src/models/ImpressionModel.php index 9e81728d1..c35756572 100644 --- a/src/models/ImpressionModel.php +++ b/src/models/ImpressionModel.php @@ -32,6 +32,9 @@ namespace seekquarry\yioop\models; use seekquarry\yioop\configs as C; use seekquarry\yioop\library as L; +use seekquarry\yioop\library\PackedTableTools; +use seekquarry\yioop\library\PartitionDocumentBundle; +C\nsconddefine("SIZE_LIMIT", 1000); /** * Model used to keep track for analytic and user experience activities that @@ -79,7 +82,7 @@ class ImpressionModel extends Model * @param int $type_id type of particular item we are adding analytic * information of (group, wiki, thread, etc) * @param int $time optional UNIX timestamp to use for impression - tracking; if set to -1 or omitted, the current time will be used + * tracking; if set to -1 or omitted, the current time will be used */ public function add($user_id, $item_id, $type_id, $time = -1) { @@ -282,33 +285,127 @@ class ImpressionModel extends Model $dbinfo = ["DBMS" => C\DBMS, "DB_HOST" => C\DB_HOST, "DB_USER" => C\DB_USER, "DB_PASSWORD" => C\DB_PASSWORD, "DB_NAME" => C\DB_NAME]; - $timestamps = [C\ONE_HOUR => floor(time()/C\ONE_HOUR) * C\ONE_HOUR, - C\ONE_DAY => floor(time()/C\ONE_DAY) * C\ONE_DAY, - C\ONE_MONTH => floor(time()/C\ONE_MONTH) * C\ONE_MONTH, - C\ONE_YEAR => floor(time()/C\ONE_YEAR) * C\ONE_YEAR]; + $timestamps = [ + C\ONE_HOUR => (int) floor(time() / C\ONE_HOUR) * C\ONE_HOUR, + C\ONE_DAY => (int) floor(time() / C\ONE_DAY) * C\ONE_DAY, + C\ONE_MONTH => (int) floor(time() / C\ONE_MONTH) * C\ONE_MONTH, + C\ONE_YEAR => (int) floor(time() / C\ONE_YEAR) * C\ONE_YEAR, + ]; $one_week_stamp = C\ONE_WEEK * floor(time()/C\ONE_WEEK); - $table = "ITEM_IMPRESSION"; - $condition = " VIEW_DATE >= ? AND ITEM_ID IS NOT NULL "; - $sum = " COUNT(*) "; - foreach ($timestamps as $period => $timestamp) { - $sql = "DELETE FROM ITEM_IMPRESSION_SUMMARY - WHERE UPDATE_PERIOD = ? AND UPDATE_TIMESTAMP = ?"; - $db->execute($sql, [$period, $timestamp]); - $sql = "INSERT INTO ITEM_IMPRESSION_SUMMARY (USER_ID, ITEM_ID, - ITEM_TYPE, UPDATE_PERIOD, UPDATE_TIMESTAMP, NUM_VIEWS) - SELECT USER_ID, ITEM_ID, ITEM_TYPE, ? AS UPDATE_PERIOD, - ? AS UPDATE_TIMESTAMP, $sum AS NUM_VIEWS - FROM $table - WHERE $condition - GROUP BY USER_ID, ITEM_ID, ITEM_TYPE"; - L\crawlLog( "Computing statistics for $period " . + $pdb = new L\PartitionDocumentBundle( + C\IMPRESSION_LOG_DIR, + ['PRIMARY KEY'=>'user_id','item_id'=>'INT','type_id'=>'INT', + 'timestamp'=>'INT'] + ); + $max_part = $pdb->parameters['SAVE_PARTITION'] ?? 0; + $db->beginTransaction(); + $prev_period = null; + foreach ($timestamps as $period => $aligned_timestamp) { + $db->execute( + 'DELETE FROM ITEM_IMPRESSION_SUMMARY WHERE UPDATE_PERIOD = ? ' + . 'AND UPDATE_TIMESTAMP = ?', + [$period, $aligned_timestamp] + ); + if ($period === C\ONE_HOUR) { + $cutoff_part = null; + for ($part = $max_part; $part >= 0; $part--) { + $index = $pdb->loadPartitionIndex( + $part, + false, + PackedTableTools::APPEND_MODE + ); + if (empty($index)) { + continue; + } + $max_timestamp = 0; + foreach ($index as $packed) { + $rows = $pdb->table_tools->unpack($packed); + foreach ($rows as $r) { + if ($r['timestamp'] > $max_timestamp) { + $max_timestamp = $r['timestamp']; + } + } + } + if ($max_timestamp >= $aligned_timestamp) { + $cutoff_part = $part; + break; + } + } + if ($cutoff_part === null) { + L\crawlLog("No relevant partition found"); + } else { + L\crawlLog("Cutoff partition: $cutoff_part"); + $summary = []; + for ($part = $cutoff_part; $part <= $max_part; $part++) { + $index = $pdb->loadPartitionIndex($part, false, + PackedTableTools::APPEND_MODE); + foreach ($index as $user_id => $packed_row) { + $rows = $pdb->table_tools->unpack($packed_row); + foreach ($rows as $row) { + if ($row['timestamp'] >= $aligned_timestamp) { + $iid = $row['item_id']; + $tid = $row['type_id']; + $key = $user_id . '-' . $iid . '-' . $tid; + if (isset($summary[$key])) { + $summary[$key]['num_views']++; + } else { + $summary[$key] = [ + 'user_id' => $user_id, + 'item_id' => $row['item_id'], + 'item_type' => $row['type_id'], + 'num_views' => 1, + ]; + } + } + } + } + } + $chunks = array_chunk($summary, 500); + L\crawlLog('Inserting ' . count($chunks) . ' chunks'); + foreach ($chunks as $chunk) { + $rows_sql = []; + foreach ($chunk as $s) { + $uid = (int)$s['user_id']; + $iid = (int)$s['item_id']; + $itype = (int)$s['item_type']; + $num_views = (int)$s['num_views']; + $rows_sql[] = "($uid,$iid,$itype,$period," + . "$aligned_timestamp,$num_views)"; + } + if ($rows_sql) { + $values = implode(',', $rows_sql); + $sql = 'INSERT INTO ITEM_IMPRESSION_SUMMARY ' + . '(USER_ID, ITEM_ID, ITEM_TYPE, ' + . 'UPDATE_PERIOD, UPDATE_TIMESTAMP, ' + . 'NUM_VIEWS) VALUES ' + . $values; + $db->execute($db->insertIgnore($sql, $dbinfo)); + } + } + } + } else { + $sql = 'INSERT INTO ITEM_IMPRESSION_SUMMARY' + . ' (USER_ID, ITEM_ID, ITEM_TYPE,' + . ' UPDATE_PERIOD, UPDATE_TIMESTAMP,' + . ' NUM_VIEWS)' + . ' SELECT USER_ID, ITEM_ID, ITEM_TYPE,' + . ' ? AS UPDATE_PERIOD,' + . ' ? AS UPDATE_TIMESTAMP, SUM(NUM_VIEWS)' + . ' FROM ITEM_IMPRESSION_SUMMARY' + . ' WHERE UPDATE_PERIOD = ?' + . ' AND UPDATE_TIMESTAMP >= ?' + . ' GROUP BY USER_ID, ITEM_ID, ITEM_TYPE'; + $db->execute( + $db->insertIgnore($sql, $dbinfo), + [$period, $aligned_timestamp, $prev_period, + $aligned_timestamp] + ); + L\crawlLog( "Computing statistics for $period " . "second update period"); - $sql = $db->insertIgnore($sql, $dbinfo); - $db->execute($sql, [$period, $timestamp, $timestamp]); - $table = "ITEM_IMPRESSION_SUMMARY"; - $condition = "UPDATE_PERIOD = $period AND UPDATE_TIMESTAMP >= ?"; - $sum = "SUM(NUM_VIEWS)"; + } + $prev_period = $period; } + $db->commit(); } /** * Deletes user summary data older than one year, delete impression @@ -602,13 +699,56 @@ class ImpressionModel extends Model $dbinfo = ["DBMS" => C\DBMS, "DB_HOST" => C\DB_HOST, "DB_USER" => C\DB_USER, "DB_PASSWORD" => C\DB_PASSWORD, "DB_NAME" => C\DB_NAME]; - $sql = "INSERT INTO ITEM_IMPRESSION VALUES (?, ?, ?, ?)"; - if ($user_id != C\PUBLIC_USER_ID) { - $db->execute($sql, [$user_id, $item_id, $type_id, - $input_timestamp]); + $log_folder = C\IMPRESSION_LOG_DIR; + $buffer_file = $log_folder . '/impression_buffer.log'; + $buffer_dir = dirname($buffer_file); + if (!is_dir($buffer_dir)) { + mkdir($buffer_dir, 0777, true); + } + $line = implode(',', [ + (int)$user_id, + (int)$item_id, + (int)$type_id, + (int)$input_timestamp + ]) . "\n"; + file_put_contents($buffer_file, $line, FILE_APPEND); + $state_file = $buffer_dir . '/buffer_count.count'; + $counter_fp = fopen($state_file, 'c+'); + flock($counter_fp, LOCK_EX); + $old_count = (int) trim(fread($counter_fp, 20)); + $new_count = $old_count + 1; + rewind($counter_fp); + ftruncate($counter_fp, 0); + fwrite($counter_fp, (string)$new_count); + fflush($counter_fp); + flock($counter_fp, LOCK_UN); + fclose($counter_fp); + if ($new_count >= C\SIZE_LIMIT) { + $lines = file( + $buffer_file, + FILE_IGNORE_NEW_LINES | FILE_SKIP_EMPTY_LINES + ) ?: []; + $rows = []; + foreach ($lines as $l) { + list($u, $i, $t, $ts) = explode(',', $l); + $rows[] = [ + 'user_id' => (int)$u, + 'item_id' => (int)$i, + 'type_id' => (int)$t, + 'timestamp' => (int)$ts + ]; + } + $format = [ + 'PRIMARY KEY' => ['user_id', 0], + 'item_id' => 'INT', + 'type_id' => 'INT', + 'timestamp' => 'INT' + ]; + $pdb = new L\PartitionDocumentBundle($log_folder, $format); + $pdb->put($rows); + file_put_contents($buffer_file, ''); + file_put_contents($state_file, '0', LOCK_EX); } - $db->execute($sql, [C\PUBLIC_USER_ID, $item_id, $type_id, - $input_timestamp]); $sql = "DELETE FROM ITEM_IMPRESSION_SUMMARY WHERE USER_ID=? AND ITEM_ID=? AND ITEM_TYPE=? AND UPDATE_PERIOD = ". C\MOST_RECENT_VIEW; -- 2.46.0.windows.1