Skip to content

Data collection and aggregation script

Valentin Kuznetsov edited this page Feb 5, 2018 · 2 revisions

Introduction

Aggregation script collects dataset usage records from various streams, such as AAA, EOS, CMSSW and JobMonitoring (CRAB). After that it takes only certain attributes and groups records by dataset name and site name. SQL count function is used to count how many times certain dataset in certain site was accessed and how many unique users accessed it.

Input

Data for CMSSW stream is taken from /project/awg/cms/cmssw-popularity/avro-snappy.

Data for AAA stream is taken from /project/monitoring/archive/xrootd/enr/gled rather than raw (as in spark_utils.py function aaa_tables) because records in enr directory contain src_experiment_site and dst_experiment_site. src_experiment_site is used as site_name attribute. It is possible to specify custom location for source of AAA stream with --aaa_hdir parameter.

Data for EOS stream is taken from /project/monitoring/archive/eos/logs/reports/cms. EOS stream does not have a site name, so it is obtained by joining EOS table with a temporary file_block_site table by file name.

Data for JobMonitoring stream is taken from /project/awg/cms/jm-data-popularity/avro-snappy.

file_block_site table

file_block_site table is created by joining fdf, bdf and ddf tables to get records containing file name, block name and dataset name. After that PhEDEx table is joined on block name. Only site name is used from PhEDEx table. In SQL query site name from PhEDEx is preprocessed to contain only tier, country and lab/university name (e.g. T0_CH_CERN_MSS and T0_CH_CERN_Export is converted to T0_CH_CERN). After this query is done, duplicate rows are eliminated by running separate SQL query with DISTINCT *.

Output records

After aggregation for each stream is finished, all aggregated records are joined together into a single table which is then exported in a CSV format to an output directory. Output directory path is made of path, specified by --fout parameter and subdirectories for year, month and day. For example: if --fout is /cms/users/user/ and date is 2017-02-01 then full output path would be /cms/users/user/2017/02/01.

If output directory already exists, it and all files and directories inside will be deleted before new records are exported there.

Output record are made of these attributes:

  • Site name - name of location where dataset is located.
  • Dataset name - name of the dataset.
  • Number of accesses - how many times this dataset was accessed from this site (site name).
  • Distinct users - number of distinct users that accessed this dataset from this dataset.
  • Source - stream name from where this record is taken (AAA, CMSSW, EOS, JobMonitoring).
  • Timestamp - date (date only, no time information) as a UNIX timestamp in milliseconds in GMT.
  • Site tier - site tier. Same as first part of site name. Does not provide any new information, added only to ease process of filtering/grouping by tier.
  • CPU time - sum of ExeCPU values. If CPU time is not available, -1 is used.
  • primds, procds, tier - dataset name split into separate parts to ease up certain operations with output.
Example (spaces added for easier reading):
site_name, dataset_name,                                nacc, distinct_users, stream, timestamp,     site_tier, cpu_time, primds,       procds,                tier
T2_US,     /SinglePhoton/Run2016C-15Sep2016-v5/MINIAOD,    9,              1, cmssw,  1485903600000, T2,             123, SinglePhoton, Run2016C-15Sep2016-v5, MINIAOD
Explanation:

/SinglePhoton/Run2016C-15Sep2016-v5/MINIAOD was accessed 9 times by 1 user in T2_US site (T2 tier). Data was taken from cmssw stream for 2017-02-01 (1485903600000). Sum of CPU time is 123.

Running using bash script

The aggregation workflow is run by run_aggregation script. Here we describe steps to run it.

# located CMSSpark on your system
wdir=$/path/CMSSpark
export PATH=$wdir/bin:$PATH
export PYTHONPATH=$wdir/src/python:$PYTHONPATH
CMSSPARK_CONF=$widr/etc/conf.json
# run run_aggregation script with certain date
run_aggregation $CMSSPARK_CONF 20170928 2>&1 1>& 20170928.log &

Bash script working algorithm

Firstly bash script finds newest available AAA, EOS, CMSSW and JM logs and saves dates in YYYYMMDD format. This is achieved by recursively listing all directories in hadoop file system. Script then checks if dates for all sources are equal. If yes, then it is checked if output for such date exists. If it does exist, script quits, because it was already run for this day. If it does not exist, script runs with given date. Note that if outputs are available for 2017-09-15, 2017-09-15, 2017-09-15 and 2017-09-14, script will not run at all rather than running for 2017-09-14!

Running individual steps

run_aggregation script constits of two steps:

  • the aggregation part which collects information from various HDFS streams
  • MONIT submission part which submits aggregation information from previous step into CERN MONIT system If necessary, those steps can be run separately.
Aggregation part

Script works with a single day at a time. Date must be specified after --date flag. It must be in YYYYMMDD format. Output directory must be specified after --fout parameter. Note that the output will be saved to <output directory>/YYYY/MM/DD. Aggregation:

run_spark data_aggregation.py --yarn --date 20170915 --fout hdfs:///cms/users/vk/agg

Data upload to MONIT

run_spark cern_monit.py --hdir hdfs:///cms/users/vk/agg/20170915 --stomp=<path to stomp> --amq <path to credentials json> --aggregation_schema

Both scripts have an optional --verbose parameter that would enable verbose output.

--aggregation_schema is needed in order to use correct scheme so data types (string, int, long) would be correct.

Setting up crontabs

There are four different cron jobs you may setup:

0 */4 * * * /data/cms/CMSSpark/bin/cron4aggregation
0 */2 * * * /data/cms/CMSSpark/bin/cron4phedex
0 */3 * * * /data/cms/CMSSpark/bin/cron4dbs_condor
0 21 * * * /data/cms/CMSSpark/bin/cron4phedex_df /data/cms/phedex

The first one cron4aggregation is responsible to fetch data from various HDFS data-streams, aggregate them and send to MONIT. Underneath it runs run_aggregation script. The second cronjob cron4phedex parses block-replicas-shapshots area on HDFS and collect all PhEDEx replica info from various CMS sites. It stores its result into hdfs:///cms/phedex area. The third cronjob cron4dbs_condor aggregates various metrics from DBS snapshot and HTCondor logs on HDFS. The results are stored into hdfs:///cms/dbs_condor area. There are four streams we aggregated so far over there: dataset, campaign, era, release. Finally, the cron4phedex_df cronjob collects phedex dataframes from HDFS, e.g. hdfs:///cms/phedex/2018/01/01 and put results into provided local disk area /data/cms/phedex/20180101. And, it merge last set of dataframes which covers 1 year of data into phedex_YYYYMMDD_YYYYMMDD.csv.gz dataframe.