createTables(); } function put($ARR){ global $_DB; $data = [ 'from' => $ARR['from_email'], 'to' => $ARR['to'], 'cc' => $ARR['email_cc'], 'bcc' => $ARR['email_bcc'], 'body' => $ARR["messaggio"], 'subject' => $ARR["oggetto"], 'payload_indi_email' => serialize($ARR), 'priority' => $this->priority, 'datetime' => date("Y-m-d H:s") ]; $_DB->insert($this->table, $data); // in locale non c'รจ il cron, stimolo lo scodamento subito if($DATI['dove_sono'] == 'loc') indi_email(['mailQueueProcess' => 1]); } function startEmailDequeue(){ global $_DB; $unassigedList = $this->getUnassignedQueueList(); if($unassigedList){ $process = $this->setQueueProcess(); if($process){ $this->setProcessToQueued($process); $list = $this->getAssignedQueueList($process); if($list){ foreach($list as $email){ $payload = $this->prepareIndiEmailPayload($email); if($payload){ sleep($this->delay); $this->sendEmail($email['id'], $payload, $process); } } } $this->endQueueProcess($process); } } } function getAssignedQueueList($process){ global $_DB; return $_DB->qa("SELECT * FROM $this->table WHERE status = 0 AND process = '$process' ORDER BY priority ASC, id ASC"); } function getUnassignedQueueList(){ global $_DB; return $_DB->qa("SELECT * FROM $this->table WHERE process IS NULL ORDER BY priority ASC, id ASC"); } function getQueueCount(){ global $_DB; return $_DB->qrs("SELECT count(*) FROM $this->table"); } function setQueueProcess(){ global $_DB; if(!$this->existsStartedProcess()){ $_DB->insert($this->processesTable, ['start' => date("Y-m-d H:s"), 'status' => "started"]); return $_DB->insert_id; } return false; } function endQueueProcess($process){ global $_DB; return $_DB->update($this->processesTable, ['end' => date("Y-m-d H:s"), 'status' => "complete"], $process); } function setProcessToQueued($process){ global $_DB; $_DB->q("UPDATE $this->table SET process = '$process' WHERE process IS NULL"); } function prepareIndiEmailPayload($email){ if($email['payload_indi_email']){ return unserialize($email['payload_indi_email']); } } function dequeueProcessEmails($process){ global $_DB; if($_DB->q(" INSERT INTO $this->dequeueTable SELECT * FROM $this->table WHERE process = '$process' AND status = 1 ")){ $_DB->q(" DELETE FROM $this->table WHERE process = '$process' AND status = 1 "); } } function dequeueEmail($id){ global $_DB; if($_DB->q(" INSERT INTO $this->dequeueTable SELECT * FROM $this->table WHERE id = '$id' AND status = 1 ")){ $_DB->q(" DELETE FROM $this->table WHERE id = '$id' AND status = 1 "); } } function sendEmail($id, $payload, $process){ global $_DB; $this->setCurrentSendingEmailId($id); // non togliere mai, altrimenti la mail viene inserita in coda in loop $payload['noQueue'] = true; // test // $payload['messaggio'] = ""; // $payload['oggetto'] = ""; if(indi_email($payload)){ $_DB->update($this->table, ['sent_datetime' => date("Y-m-d H:s"), 'process' => $process, 'status' => 1], $id); $_DB->update($this->processesTable, ['last_action' => date("Y-m-d H:s")], $process); $this->dequeueEmail($id); }else{ $_DB->update($this->table, ['process' => null, 'status' => null], $id); } $this->unsetCurrentSendingEmailId(); } function existsStartedProcess(){ global $_DB; $active = $_DB->qr("SELECT *, TIME_TO_SEC(TIMEDIFF(NOW(), last_action)) AS seconds_from_last_action FROM $this->processesTable WHERE status = 'started'"); if(!$active) return false; if($active['seconds_from_last_action'] > 60 || !$active['seconds_from_last_action']){ $this->endQueueProcess($active['id']); return false; }else{ return true; } } function checkQueue(){ global $_DB, $DATI; if($this->getQueueCount() > $this->$queueLimitAlert){ mail("lp@tnx.it", "Alert coda email > ".$this->$queueLimitAlert." ".$DATI['nome_sito'], ""); } } function dailyStatistics($day = null){ global $_DB; if($day == null) $day = date("Y-m-d",strtotime("-1 days")); $sentEmail = $_DB->qrs("SELECT count(*) FROM $this->dequeueTable WHERE DATE_FORMAT(sent_datetime, '%Y-%m-%d') = '$day'"); mail("lp@tnx.it", "Email inviate il $day da ".$DATI['nome_sito'].": $sentEmail", ""); } function setCurrentSendingEmailId($id){ $this->currentSendingEmailId = $id; } function unsetCurrentSendingEmailId(){ $this->currentSendingEmailId = null; } function addError($id, $type, $array){ global $_DB; if($array['error']){ $errorsText = $_DB->qrs("SELECT errors FROM $this->table WHERE id = '$id'"); $errorsText = date("d-m-Y H:s")." - ".$array['error']."\n".$errorsText; $_DB->update($this->table, ['errors' => $errorsText], $id); } } function createTables(){ global $_DB; $_DB->q(" CREATE TABLE IF NOT EXISTS $this->table ( `id` int(11) NOT NULL AUTO_INCREMENT, `body` text, `tpl` text, `subject` text, `from` varchar(255) DEFAULT NULL, `to` varchar(512) DEFAULT NULL, `cc` varchar(512) DEFAULT NULL, `bcc` varchar(512) DEFAULT NULL, `page` varchar(255) DEFAULT NULL, `priority` int(11) NOT NULL DEFAULT '100', `datetime` datetime DEFAULT NULL, `payload_indi_email` text, `sent_datetime` datetime DEFAULT NULL, `process` int(11) DEFAULT NULL, `status` int(1) NOT NULL DEFAULT '0', `errors` text, PRIMARY KEY (`id`) ) ENGINE=MyISAM AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; "); $_DB->q(" CREATE TABLE IF NOT EXISTS $this->dequeueTable ( `id` int(11) NOT NULL, `body` text, `tpl` text, `subject` text, `from` varchar(255) DEFAULT NULL, `to` varchar(512) DEFAULT NULL, `cc` varchar(512) DEFAULT NULL, `bcc` varchar(512) DEFAULT NULL, `page` varchar(255) DEFAULT NULL, `priority` int(11) NOT NULL DEFAULT '100', `datetime` datetime DEFAULT NULL, `payload_indi_email` text, `sent_datetime` datetime DEFAULT NULL, `process` int(11) DEFAULT NULL, `status` int(1) NOT NULL DEFAULT '0', `errors` text, PRIMARY KEY (`id`) ) ENGINE=MyISAM AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; "); $_DB->q(" CREATE TABLE IF NOT EXISTS $this->processesTable ( `id` int(11) NOT NULL AUTO_INCREMENT, `start` datetime DEFAULT NULL, `end` datetime DEFAULT NULL, `last_action` datetime DEFAULT NULL, `status` enum('complete','started') DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=MyISAM AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; "); } } ?>