nohup bash 스크립트의 nodejs 하위 프로세스 잠금

nohup bash 스크립트의 nodejs 하위 프로세스 잠금

나는 nohup을 사용하여 수백에서 수천 개의 Nodejs 명령 목록이 포함된 스크립트를 실행합니다. 이러한 nodejs 하위 프로세스는 mysql 및 Salesforce의 데이터를 Couchdb로 동기화합니다.

$ nohup ./mf-sync.staging-mfdb.sh 2>&1 > mf-sync.staging-mfdb.log &
$ mf-sync.staging-mfdb.sh

스크립트:

#!/bin/bash
echo "Starting..."
echo "pid $$"
node /opt/node/mix-sync/mf-sync.js --mfi=100017 --source=100982
node /opt/node/mix-sync/mf-sync.js --mfi=100026 --source=101619
node /opt/node/mix-sync/mf-sync.js --mfi=100027 --source=100982
node /opt/node/mix-sync/mf-sync.js --mfi=100036 --source=101619
node /opt/node/mix-sync/mf-sync.js --mfi=100063 --source=100982
node /opt/node/mix-sync/mf-sync.js --mfi=100075 --source=101160
etc....

터미널에서 하위 프로세스의 실행이 중지되는 것을 확인했습니다.

[rgoya@host ~]$ ps -e | grep sync
   26 ?        00:00:00 async/mgr
   30 ?        00:03:34 sync_supers
 6333 ?        00:00:01 mf-sync
30097 ?        00:00:00 mf-sync.staging

[rgoya@host ~]$ ps -e | grep sync
   26 ?        00:00:00 async/mgr
   30 ?        00:03:34 sync_supers
 6333 ?        00:00:01 mf-sync
30097 ?        00:00:00 mf-sync.staging

[rgoya@host ~]$ kill 6333

[rgoya@host ~]$ ps -e | grep sync
   26 ?        00:00:00 async/mgr
   30 ?        00:03:34 sync_supers
 6423 ?        00:00:00 mf-sync
30097 ?        00:00:00 mf-sync.staging

[rgoya@host ~]$ ps -e | grep sync
   26 ?        00:00:00 async/mgr
   30 ?        00:03:34 sync_supers
 6449 ?        00:00:01 mf-sync
30097 ?        00:00:00 mf-sync.staging

노트:30097은 프로세스의 pid입니다 nohup.

자식 프로세스를 종료하기 전과 종료한 후의 로그를 확인해 보니 다음 nodejs명령이 순차적으로 실행되는 것을 볼 수 있습니다. 자세한 출력 플래그를 사용하여 실행 해 보았지만 --debug예외는 발견되지 않았습니다.

보충 설명

  • Nodejs의 메모리 제한은 1GB입니다.
  • Couchdb의 기본 최대 연결 수는 2048입니다.
  • 콘텐츠 mf-sync.js.

    #!/usr/bin/env node
    process.title = 'mf-sync';
    
    var path = require('path')
    ,   fs = require('fs')
    ,   _ = require('underscore');
    
    // Parse command-line arguments
    var args = _.chain(process.argv).rest(2).map(function(arg) {
        arg = arg.replace('--', '').split('=');
        _.size(arg) === 1 && arg.push(true);
        return arg;
    }).object().value();
    
    if (!args.mfi) throw new Error('MFI ID not specified');
    if (!args.source) throw new Error('Source ID not specified');
    
    // Output when using `--debug` flag
    var debug = function() { if (_.has(args, 'debug')) console.info.apply(this, arguments); };
    
    // Simulation mode
    var simulate = _.has(args, 'simulate');
    
    require('util').log('Synchronization for ' + ['mfi', args.source, args.mfi].join('/') + ' started');
    simulate && console.warn('Simulation mode enabled. No changes will occurr.');
    debug(args);
    
    // Load MySQL configuration
    var my = require('mysql');
    var myConfig = require(path.join(__dirname, 'mysql.json'));
    var db = 'gold';
    if (args.source == '101027') db = 'mfdb';
    var mysql = my.createConnection(myConfig[db]);
    debug('MySQL', myConfig[db].database);
    
    // Load Salesforce configuration
    var sf = require('node-salesforce');
    var sfConfig = require(path.join(__dirname, 'salesforce.json'));
    var salesforce = new sf.Connection(sfConfig);
    debug('Salesforce', sfConfig.username);
    
    // Load CouchDB configuration
    var cradle = require('cradle');
    var couchConfig = require(path.join(__dirname, 'couchdb.json'));
    var couch = new(cradle.Connection)(couchConfig.mfdb.host, couchConfig.mfdb.port, couchConfig.mfdb.options).database(couchConfig.mfdb.name);
    debug('CouchDB', couchConfig.mfdb.name);
    
    // Add missing function to Underscore.js
    _.mixin({
        compactObject: function(obj) {
            _.each(obj, function(v, k) {
                if (_.isNull(v) || _.isFunction(v)) delete obj[k];
            });
            return obj;
        }
    });
    
    // Get MFI data from MySQL
    // -----------------------
    var getMySQLData = function(mfi, callback) {
        mysql.connect();
    
        // Get master MFI metadata
        debug('Getting master MFI metadata from `mfi`.');
        mysql.query("SELECT * FROM mfi WHERE source_id = ? AND mfi_id = ?", [mfi.source_id, mfi.mfi_id], function(err, rows, fields) {
            if (err) throw new Error(err);
            _.defaults(mfi, _.chain(rows).first().omit(['parse', '_typeCast']).value());
        });
    
        // Define MFDB data tables
        var tables = {
            'usd/false': ['balance_sheet_usd', 'calculation_usd', 'income_statement_usd', 'infrastructure', 'portfolio_report_usd', 'products_and_clients', 'social_performance'],
            'usd/true': ['balance_sheet_adjusted_usd', 'calculation_adjusted_usd', 'income_statement_adjusted_usd', 'infrastructure_adjusted', 'portfolio_report_adjusted_usd', 'products_and_clients_adjusted', 'social_performance'],
            'local/false': ['balance_sheet', 'calculation', 'income_statement', 'infrastructure', 'portfolio_report', 'products_and_clients', 'social_performance'],
            'local/true': ['balance_sheet_adjusted', 'calculation_adjusted', 'income_statement_adjusted', 'infrastructure_adjusted', 'portfolio_report_adjusted', 'products_and_clients_adjusted', 'social_performance']
        };
        // Remove table name variance
        var baseTable = _.memoize(function(table) {
            return table.replace('_usd', '').replace('_adjusted', '');
        });
    
        var docs = {};
        // Get all available MFDB data for the current `mfi_vid`
        debug('Getting all available MFDB data for the current `mfi_vid`.');
        _.each(_.keys(tables), function(key) {
            _.each(tables[key], function(table) {
                debug('Querying', key, 'data from', table);
                mysql.query("SELECT t.* FROM ?? t INNER JOIN mfi ON t.source_id = mfi.source_id AND t.mfi_id = mfi.mfi_id AND t.mfi_vid = mfi.mfi_vid WHERE t.source_id = ? AND t.mfi_id = ? ORDER BY t.fiscal_year ASC, t.period_type DESC, t.as_of_date ASC", [table, mfi.source_id, mfi.mfi_id], function(err, rows, fields) {
                    if (err) throw new Error(err);
    
                    // Create full document data
                    _.each(rows, function(row) {
                        // Create doc._id
                        var doc_id = ['mfi-period', mfi.source_id, mfi.mfi_id, key, row.fiscal_year, row.period_type, row.as_of_date.toISOString().substr(0, 10)].join('/');
                        debug('Processing', table, 'data for', doc_id);
    
                        // Initialize document
                        if (!docs[doc_id]) docs[doc_id] = {
                            _id: doc_id,
                            type: 'mfi-period',
                            currency: key.split('/')[0],
                            adjusted: key.split('/')[1] === 'true',
                            fiscal_year: row.fiscal_year,
                            period_type: row.period_type,
                            as_of_date: row.as_of_date
                        };
                        if (!docs[doc_id].currency_code && row.currency_code) docs[doc_id].currency_code = row.currency_code;
    
                        // Extend MFDB data into document
                        debug('Adding', table, 'data to', doc_id);
                        row = _.chain(row).omit(['mfi_id', 'mfi_vid', 'source_id', 'period_type', 'as_of_date', 'fiscal_year', 'currency_code', 'currency_unit']).compactObject().value();
                        if (!_.isEmpty(row)) docs[doc_id][baseTable(table)] = row;
                    });
                });
            });
        });
    
        // Get all scenario data to create dimension hierarchy
        var tree = {};
        mysql.query("SELECT * FROM scenarios", function(err, rows) {
            debug('Processing scenario data into hierarchical tree.');
            if (err) throw new Error(err);
    
            // Get all children scenarios for any given parent
            var getChildren = function(parent) {
                var children = _.chain(rows).where({parent: parent}).sortBy('weight').pluck('scenarios').object({}).tap(function(scenarios) {
                    // Remove used scenarios from master list to decrease stack size
                    _.each(_.keys(scenarios), function(scenario) {
                        rows = _.without(rows, _.findWhere(rows, {scenarios: scenario}));
                    });
                }).value();
                if (_.isEmpty(children)) return null;
                return children;
            }
    
            // Recursively get dimension hierarchy
            var getTree = function(hierarchy) {
                if (_.isEmpty(hierarchy)) return;
                _.each(_.keys(hierarchy), function(p) {
                    hierarchy[p] = getChildren(p);
                    if (!_.isEmpty(hierarchy[p])) getTree(hierarchy[p]);
                });
            }
    
            tree = getChildren('');
            getTree(tree);
        });
    
        // Find path to nested object property
        var findPath = _.memoize(function(needles, haystack) {
            function constructPath(haystack, needle, path) {
                if (!_.isObject(haystack)) return false;
                if (typeof haystack !== 'object') return false;
                for (var key in haystack) {
                    var value = haystack[key];
                    var currentPath = _.extend([], path);
                    currentPath.push(key);
                    if (key === needle) return currentPath;
                    var foundPath = constructPath(value, needle, currentPath);
                    if (foundPath) return foundPath;
                }
            }
            // Handle comma-separated nested hierarchies
            return _.chain(needles.split(',')).map(function(needle) {
                return constructPath(haystack, needle, []);
            }).flatten().compact().value();
        });
        // Assign value inside a nested object property
        var deepAssign = function(obj, path, val) {
            for (var i = 0 in path) {
                var key = path[i];
                if (i == path.length - 1) {
                    if (typeof obj[key] === 'object') obj[key].value = val;
                    else obj[key] = val;
                } else if (typeof obj[key] !== 'object') {
                    obj[key] = _.isUndefined(obj[key]) ? {} : {value: obj[key]};
                }
                obj = obj[key];
            }
        }
        // Sanitize dimension names
        var sanitizeDimensions = _.memoize(function(dimensions) {
            return _.map(dimensions, function(dimension) {
                dimension = dimension.replace(/mix_/g, '').replace(/Dimension/g, '').replace(/Member/g, '');
                if (/:/.test(dimension)) return dimension.split(':')[1];
                else return dimension;
            });
        });
    
        // Get dimension data for all available documents
        _.each(['usd', 'local'], function(currency) {
            var dimensions_table = currency === 'usd' ? 'dimensions_usd' : 'dimensions';
            debug('Querying', currency, 'data from', dimensions_table);
            mysql.query("SELECT d.fiscal_year, d.period_type, d.as_of_date, d.scenarios, d.line_item_value, t.db_table, t.db_field FROM ?? d INNER JOIN mfi ON d.source_id = mfi.source_id AND d.mfi_id = mfi.mfi_id AND d.mfi_vid = mfi.mfi_vid LEFT JOIN Taxonomy t ON d.element_id = t.Elementid WHERE d.line_item_value IS NOT NULL AND t.db_table IS NOT NULL AND t.db_field IS NOT NULL AND d.source_id = ? AND d.mfi_id = ?", [dimensions_table, mfi.source_id, mfi.mfi_id], function(err, rows, fields) {
                debug('Processing all data from', dimensions_table);
                if (err) throw new Error(err);
                _.each(rows, function(row) {
                    var dimension_path = findPath(row.scenarios, tree);
                    if (_.isEmpty(dimension_path)) return console.warn('MISSING SCENARIO', row.scenarios);
                    _.each(['true', 'false'], function(adjusted) {
                        var doc_id = ['mfi-period', mfi.source_id, mfi.mfi_id, currency, adjusted, row.fiscal_year, row.period_type, row.as_of_date.toISOString().substr(0, 10)].join('/');
                        var path = sanitizeDimensions([row.db_table, row.db_field].concat(dimension_path));
                        docs[doc_id] && deepAssign(docs[doc_id], path, parseFloat(row.line_item_value));
                    });
                });
            });
        });
    
        mysql.end(function(err) {
            debug('Disconnected from MySQL', db);
            if (err) throw new Error(err);
            callback(mfi, docs);
        });
    }
    
    // Get MFI metadata from Salesforce
    // --------------------------------
    var getSalesforceData = function(mfi, docs, callback) {
        var remaining = 4;
        var done = function(mfi, docs) {
            if (--remaining === 0) {
                callback(mfi, docs);
    
                // Logout from Salesforce
                salesforce.logout(function(err) {
                    debug('Logged out from Salesforce');
                    if (err) throw new Error(err);
                });
            }
        }
    
        // Login into Salesforce
        debug('Login into Salesforce');
        salesforce.login(sfConfig.username, sfConfig.password + sfConfig.security_token, function(err, userInfo) {
            if (err) throw new Error(err);
    
            // Get main MFI Metadata
            debug('Getting MFI metadata from Salesforce');
            salesforce.query("SELECT Id, Name, Record_ID__c, mix_Diamonds__c, Date_Established__c, mix_Region__c, Country__c, Operations_Comprised_by_MF__c, Regulated__c, Current_Legal_Status__c, Profit_Status__c FROM Account WHERE Record_ID__c = '" + mfi.mfi_id + "'", function(err, result) {
                if (err) throw new Error(err);
                if (result.totalSize === 0) throw new Error('MFI does not exist');
                var record = {};
                _.chain(result.records).first().omit(['attributes', 'Id']).each(function(v, k) {
                    // Make attributes lowercase
                    record[k.toLowerCase()] = v;
                });
                _.extend(mfi, record);
                mfi.mfi_name = mfi.name;
                done(mfi, docs);
            });
    
            // Determine whether MFI contains Social Performance Profile data
            debug('Determining whether MFI contains SP Profile data.');
            salesforce.query("SELECT Id, Record_ID__c FROM Account WHERE Record_ID__c = '" + mfi.mfi_id + "' AND Id IN (SELECT Organization__c FROM SP_Profile__c)", function(err, result) {
                if (err) throw new Error(err);
                mfi.sp_profile = !_.isEmpty(result.records);
                done(mfi, docs);
            });
    
            // Get list of MFI Network Affiliations
            debug('Getting list of MFI Network Affiliations');
            salesforce.query("SELECT Source_Organization__r.Name FROM Partnership__c WHERE Relationship__c = 'Network Affiliation' AND Status__c = 'Current' AND Target_Organization__r.Id = '" + mfi.organization_id + "'", function(err, result) {
                if (err) throw new Error(err);
                mfi.networks = _.chain(result.records).pluck('Source_Organization__r').pluck('Name').value();
                done(mfi, docs);
            });
    
            // Get annual diamonds
            debug('Getting annual diamonds.');
            salesforce.query("SELECT Period__c, Diamond_Score__c FROM Data_Campaign_Status__c WHERE Organization__c = '" + mfi.organization_id + "'", function(err, result) {
                if (err) throw new Error(err);
                // Group diamonds by year
                var diamonds = _.chain(result.records).map(function(period) {
                    return _.chain(period).pick(['Period__c', 'Diamond_Score__c']).values().value();
                }).object().value();
                // Add diamonds to corresponding periods
                _.chain(docs).filter(function(doc) { return doc.period_type === 'ANN'; }).each(function(doc) {
                    doc.annual_diamonds = diamonds[doc.fiscal_year];
                });
                done(mfi, docs);
            });
        });
    }
    
    // Calculate Peer Group data
    // -------------------------
    var calculatePeerGroupData = function(docs, callback) {
        // Safely get data point value
        var getVal = function(obj, group, prop) {
            if (_.has(obj, group) && _.has(obj[group], prop)) {
                return obj[group][prop].value || obj[group][prop];
            }
            return undefined;
        }
    
        _.each(docs, function(doc, id) {
            var peer_groups = {};
    
            // Age
            debug('Calculating peer group age for', doc._id);
            if (_.has(doc, 'date_established__c')) {
                var age = Math.abs(Date.parse(doc.as_of_date) - Date.parse(doc.date_established__c)) / (86400000 * 365.242199);
                if (age) {
                    if (age < 4) peer_groups['age'] = 'New';
                    else if (age <= 8) peer_groups['age'] = 'Young';
                    else if (age > 8) peer_groups['age'] = 'Mature';
                }
            }
    
            // Intermediation
            debug('Calculating peer group intermediation for', doc._id);
            var deposits = getVal(doc, 'balance_sheet', 'deposits');
            var total_assets = getVal(doc, 'balance_sheet', 'total_assets');
            if (!_.isUndefined(deposits) && !_.isUndefined(total_assets) && total_assets > 0) {
                var ratio = deposits / total_assets;
                if (ratio === 0) peer_groups['intermediation'] = 'Non FI';
                else if (ratio < 0.2) peer_groups['intermediation'] = 'Low FI';
                else if (ratio >= 0.2) peer_groups['intermediation'] = 'High FI';
            }
            else if (total_assets === 0) {
                peer_groups['intermediation'] = 'Non FI';
            }
    
            // Market
            debug('Calculating peer group market for', doc._id);
            var depth = getVal(doc, 'calculation', 'average_balance_borrower_per_capita') || getVal(doc, 'calculation', 'average_outstanding_balance_per_capita');
            var average_loan_size = getVal(doc, 'calculation', 'average_balance_borrower') || getVal(doc, 'calculation', 'average_outstanding_balance');
            if (!_.isUndefined(depth) || !_.isUndefined(average_loan_size)) {
                if (depth < .2 || average_loan_size < 150) peer_groups['market'] = 'Low End';
                else if ((depth >= .2) && (depth < 1.5)) peer_groups['market'] = 'Broad';
                else if ((depth >= 1.5)  && (depth < 2.5)) peer_groups['market'] = 'High End';
                else if ((depth >= 2.5)) peer_groups['market'] = 'Small Business';
            }
    
            // Outreach
            debug('Calculating peer group outreach for', doc._id);
            var total_borrowers = getVal(doc, 'products_and_clients', 'total_borrowers');
            if (total_borrowers < 10000) peer_groups['outreach'] = 'Small';
            else if (total_borrowers < 30000) peer_groups['outreach'] = 'Medium';
            else if (total_borrowers >= 30000) peer_groups['outreach'] = 'Large';
    
            // Scale
            debug('Calculating peer group scale for', doc._id);
            if (_.has(doc, 'mix_region__c')) {
                var gross_loan_portfolio = getVal(doc, 'balance_sheet', 'gross_loan_portfolio');
                if (gross_loan_portfolio < 2000000 || (gross_loan_portfolio < 4000000 && doc.mix_region__c == 'Latin America and The Caribbean')) peer_groups['scale'] = 'Small';
                else if (gross_loan_portfolio < 8000000 || (gross_loan_portfolio < 15000000 && doc.mix_region__c == 'Latin America and The Caribbean')) peer_groups['scale'] = 'Medium';
                else if (gross_loan_portfolio > 8000000) peer_groups['scale'] = 'Large';
            }
    
            // Sustainability
            debug('Calculating peer group sustainability for', doc._id);
            var operational_self_sufficiency = getVal(doc, 'calculation', 'operational_self_sufficiency');
            if (!_.isUndefined(operational_self_sufficiency)) {
                if (doc.adjusted) peer_groups['sustainability'] = operational_self_sufficiency < 1 ? 'Non-FSS' : 'FSS';
                else peer_groups['sustainability'] = operational_self_sufficiency < 1 ? 'Non-OSS' : 'OSS';
            }
    
            if (!_.isEmpty(peer_groups)) docs[id].peer_groups = peer_groups;
        });
    
        callback(docs);
    }
    
    // Send data to CouchDB
    // --------------------
    var updateCouchDB = function(docs, callback) {
        // Update master MFI record
        debug('Updating master MFI record');
        var mfi = docs.shift();
        couch.get(mfi._id, function(err, doc) {
            if (err) {
                if (err.error === 'not_found') {
                    require('util').log('Inserting ' + mfi._id);
                    !simulate && couch.save(mfi._id, mfi, function(err, res) {
                        debug('Inserted', res);
                        if (err) throw new Error(err);
                    });
                } else throw new Error(err);
            } else if (doc._rev) {
                require('util').log('Updating ' + mfi._id);
                !simulate && couch.save(mfi._id, doc._rev, mfi, function(err, res) {
                    debug('Updated', res);
                    if (err) throw new Error(err);
                });
            }
        });
    
        // Get list of existing IDs in CouchDB
        debug('Getting list of existing IDs in CouchDB');
        couch.all({startkey: ['mfi-period', args.source, args.mfi].join('/'), endkey: ['mfi-period', args.source, args.mfi, '~'].join('/')}, function(err, ids) {
            if (err) throw new Error(err);
    
            // Remove outdated documents from CouchDB
            _.chain(ids).pluck('id').difference(_.pluck(docs, '_id')).map(function(id) {
                return _.findWhere(ids, {id: id});
            }).each(function(doc) {
                require('util').log('Removing ' + doc.id);
                couch.remove(doc.id, doc.value.rev, function(err, res) {
                    debug('Removed', res);
                    if (err) throw new Error(err);
                });
            });
    
            // Insert/update all documents for this MFI
            _.each(docs, function(doc) {
                var update = _.findWhere(ids, {id: doc._id});
                if (update) {
                    require('util').log('Updating ' + doc._id);
                    !simulate && couch.save(doc._id, update.value.rev, doc, function(err, res) {
                        debug('Updated', res);
                        if (err) throw new Error(err);
                    });
                } else {
                    require('util').log('Inserting ' + doc._id);
                    !simulate && couch.save(doc._id, doc, function(err, res) {
                        debug('Inserted', res);
                        if (err) throw new Error(err);
                    });
                }
            });
    
            callback();
        });
    }
    
    // Initialize MFI document
    var mfi = {
        _id: 'mfi/' + args.source + '/' + args.mfi,
        type: 'mfi',
        source_id: args.source,
        mfi_id: args.mfi,
        updated: new Date()
    };
    
    getMySQLData(mfi, function(mfi, docs) {
        getSalesforceData(mfi, docs, function(mfi, docs) {
            // Merge MFI metadata into each period
            _.each(docs, function(doc, id) {
                docs[id] = _.extend(_.clone(mfi), doc);
            });
            calculatePeerGroupData(docs, function(docs) {
                // Convert to array for bulk updating
                docs = _.union([mfi], _.values(docs));
                updateCouchDB(docs, function() {
                    require('util').log('Synchronization for ' + ['mfi', args.source, args.mfi].join('/') + ' finished');
                });
            });
        });
    });
    

질문

나는 알고 싶다:

  1. 이러한 하위 프로세스가 정지되는 이유는 무엇입니까? (동결된 항목이 실행되고 중지된 항목과 다르다는 증거를 찾을 수 없습니다.)
  2. 수동으로 종료할 필요가 없도록 몇 분 동안 정지되는 하위 프로세스를 중지하는 스크립트를 어떻게 작성할 수 있습니까?

답변1

일부 리소스가 고갈되고 있다고 추측해야 합니다(비록 이미 추측하고 있을 수도 있음). 어쩌면 최대 오픈 파일, mysql 또는 Salesforce일 수도 있습니다. 전혀 모른다.

하지만 이 문제를 해결할 수 있는 한 가지 방법은 mf-sync를 모듈에 넣고 많은 mf-sync 항목을 사용하는 대신 대기열을 사용하여 이 mf-sync 항목을 실행하는 제어 노드 스크립트를 사용하는 것입니다. 스크립트는 대기열의 제어된 배치와 약간 비슷합니다. 다음과 같은 것을 시도해보세요https://github.com/learnboost/kue

이 작업을 수행하는 방식이 약간 미친 것 같습니다. 하지만 파일이 충분하지 않은 경우에는 제한을 늘려도 문제가 없을 수 있습니다. http://www.cyberciti.biz/faq/linux-increase-the-maximum-number-of-open-files/

또는 실제로 이 작업을 셸에서 수행하려면 batch명령을 사용하여 대기열을 처리할 수 있습니다. http://pubs.opengroup.org/onlinepubs/009695399/utilities/batch.html

아니면 이런게 더 나을지도 모르겠네요http://pebblesinthesand.wordpress.com/2008/05/22/a-srcipt-for-running-processes-in-parallel-in-bash/

관련 정보