File manager - Edit - /home/monara/public_html/test.athavaneng.com/Sending.tar
Back
ScheduledTaskSubscribersListingRepository.php 0000644 00000013561 15073230147 0015651 0 ustar 00 <?php declare(strict_types = 1); namespace MailPoet\Newsletter\Sending; if (!defined('ABSPATH')) exit; use MailPoet\Entities\ScheduledTaskSubscriberEntity; use MailPoet\Listing\ListingDefinition; use MailPoet\Listing\ListingRepository; use MailPoet\Util\Helpers; use MailPoetVendor\Doctrine\ORM\QueryBuilder; class ScheduledTaskSubscribersListingRepository extends ListingRepository { public function getGroups(ListingDefinition $definition): array { $queryBuilder = clone $this->queryBuilder; $this->applyFromClause($queryBuilder); $this->applyParameters($queryBuilder, $definition->getParameters()); // total count $countQueryBuilder = clone $queryBuilder; $countQueryBuilder->select('COUNT(sts.subscriber) AS subscriberCount'); $totalCount = intval($countQueryBuilder->getQuery()->getSingleScalarResult()); // Sent count $sentCountQuery = clone $queryBuilder; $sentCountQuery->select('COUNT(sts.subscriber) AS subscriberCount'); $sentCountQuery->andWhere('sts.processed = :processedStatus'); $sentCountQuery->andWhere('sts.failed = :failedStatus'); $sentCountQuery->setParameter('processedStatus', ScheduledTaskSubscriberEntity::STATUS_PROCESSED); $sentCountQuery->setParameter('failedStatus', ScheduledTaskSubscriberEntity::FAIL_STATUS_OK); $sentCount = intval($sentCountQuery->getQuery()->getSingleScalarResult()); // Failed count $failedCountQuery = clone $queryBuilder; $failedCountQuery->select('COUNT(sts.subscriber) AS subscriberCount'); $failedCountQuery->andWhere('sts.failed = :failedStatus'); $failedCountQuery->setParameter('failedStatus', ScheduledTaskSubscriberEntity::FAIL_STATUS_FAILED); $failedCount = intval($failedCountQuery->getQuery()->getSingleScalarResult()); // Unprocessed count $unprocessedCountQuery = clone $queryBuilder; $unprocessedCountQuery->select('COUNT(sts.subscriber) AS subscriberCount'); $unprocessedCountQuery->andWhere('sts.processed = :processedStatus'); $unprocessedCountQuery->setParameter('processedStatus', ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED); $unprocessedCount = intval($unprocessedCountQuery->getQuery()->getSingleScalarResult()); return [ [ 'name' => 'all', 'label' => __('All', 'mailpoet'), 'count' => $totalCount, ], [ 'name' => ScheduledTaskSubscriberEntity::SENDING_STATUS_SENT, 'label' => __('Sent', 'mailpoet'), 'count' => $sentCount, ], [ 'name' => ScheduledTaskSubscriberEntity::SENDING_STATUS_FAILED, 'label' => __('Failed', 'mailpoet'), 'count' => $failedCount, ], [ 'name' => ScheduledTaskSubscriberEntity::SENDING_STATUS_UNPROCESSED, 'label' => __('Unprocessed', 'mailpoet'), 'count' => $unprocessedCount, ], ]; } protected function applySelectClause(QueryBuilder $queryBuilder) { $queryBuilder->select("PARTIAL sts.{task,subscriber,processed,failed,error,createdAt,updatedAt}, PARTIAL s.{id, email, firstName, lastName}"); } protected function applyFromClause(QueryBuilder $queryBuilder) { $queryBuilder->from(ScheduledTaskSubscriberEntity::class, 'sts') ->leftJoin('sts.subscriber', 's'); } protected function applyGroup(QueryBuilder $queryBuilder, string $group) { if ($group === ScheduledTaskSubscriberEntity::SENDING_STATUS_SENT) { $queryBuilder->andWhere('sts.processed = :processedStatus'); $queryBuilder->andWhere('sts.failed = :failedStatus'); $queryBuilder->setParameter('processedStatus', ScheduledTaskSubscriberEntity::STATUS_PROCESSED); $queryBuilder->setParameter('failedStatus', ScheduledTaskSubscriberEntity::FAIL_STATUS_OK); } elseif ($group === ScheduledTaskSubscriberEntity::SENDING_STATUS_FAILED) { $queryBuilder->andWhere('sts.failed = :failedStatus'); $queryBuilder->setParameter('failedStatus', ScheduledTaskSubscriberEntity::FAIL_STATUS_FAILED); } elseif ($group === ScheduledTaskSubscriberEntity::SENDING_STATUS_UNPROCESSED) { $queryBuilder->andWhere('sts.processed = :processedStatus'); $queryBuilder->setParameter('processedStatus', ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED); } } protected function applySorting(QueryBuilder $queryBuilder, string $sortBy, string $sortOrder) { // ScheduledTaskSubscriber doesn't have id column so the default fallback value 'id' // generated in MailPoet\Listing\Handler needs to be changed to something else if ($sortBy === 'id') { $sortBy = 'sts.subscriber'; } elseif ($sortBy === 'subscriberId') { // Ordering by subscriberId is mapped to email for consistency with Subscriber listing $sortBy = 's.email'; } else { $sortBy = "sts.{$sortBy}"; } $queryBuilder->addOrderBy($sortBy, $sortOrder); } protected function applySearch(QueryBuilder $queryBuilder, string $search) { $search = Helpers::escapeSearch($search); $queryBuilder ->andWhere('s.email LIKE :search or s.firstName LIKE :search or s.lastName LIKE :search') ->setParameter('search', "%$search%"); } protected function applyFilters(QueryBuilder $queryBuilder, array $filters) { // the parent class requires this method, but scheduled task subscribers listing doesn't currently support this feature. } protected function applyParameters(QueryBuilder $queryBuilder, array $parameters) { if (isset($parameters['task_ids']) && !empty($parameters['task_ids'])) { $queryBuilder->andWhere('sts.task IN (:taskIds)') ->setParameter('taskIds', $parameters['task_ids']); } } public function getCount(ListingDefinition $definition): int { $queryBuilder = clone $this->queryBuilder; $this->applyFromClause($queryBuilder); $this->applyConstraints($queryBuilder, $definition); $queryBuilder->select("COUNT(DISTINCT sts.subscriber)"); return intval($queryBuilder->getQuery()->getSingleScalarResult()); } } index.php 0000644 00000000006 15073230147 0006362 0 ustar 00 <?php ScheduledTasksRepository.php 0000644 00000033172 15073230147 0012273 0 ustar 00 <?php // phpcs:ignore SlevomatCodingStandard.TypeHints.DeclareStrictTypes.DeclareStrictTypesMissing namespace MailPoet\Newsletter\Sending; if (!defined('ABSPATH')) exit; use MailPoet\Cron\Workers\Scheduler; use MailPoet\Cron\Workers\SendingQueue\SendingQueue; use MailPoet\Doctrine\Repository; use MailPoet\Entities\NewsletterEntity; use MailPoet\Entities\ScheduledTaskEntity; use MailPoet\Entities\ScheduledTaskSubscriberEntity; use MailPoet\Entities\SendingQueueEntity; use MailPoet\Entities\SubscriberEntity; use MailPoet\WP\Functions as WPFunctions; use MailPoetVendor\Carbon\Carbon; use MailPoetVendor\Carbon\CarbonImmutable; use MailPoetVendor\Doctrine\DBAL\Connection; use MailPoetVendor\Doctrine\ORM\EntityManager; use MailPoetVendor\Doctrine\ORM\Query\Expr\Join; /** * @extends Repository<ScheduledTaskEntity> */ class ScheduledTasksRepository extends Repository { /** @var WPFunctions */ private $wp; public function __construct( EntityManager $entityManager, WPFunctions $wp ) { $this->wp = $wp; parent::__construct($entityManager); } /** * @param NewsletterEntity $newsletter * @return ScheduledTaskEntity[] */ public function findByNewsletterAndStatus(NewsletterEntity $newsletter, string $status): array { return $this->doctrineRepository->createQueryBuilder('st') ->select('st') ->join(SendingQueueEntity::class, 'sq', Join::WITH, 'st = sq.task') ->andWhere('st.status = :status') ->andWhere('sq.newsletter = :newsletter') ->setParameter('status', $status) ->setParameter('newsletter', $newsletter) ->getQuery() ->getResult(); } /** * @param NewsletterEntity $newsletter */ public function findOneByNewsletter(NewsletterEntity $newsletter): ?ScheduledTaskEntity { $scheduledTask = $this->doctrineRepository->createQueryBuilder('st') ->join(SendingQueueEntity::class, 'sq', Join::WITH, 'st = sq.task') ->andWhere('sq.newsletter = :newsletter') ->orderBy('sq.updatedAt', 'desc') ->setMaxResults(1) ->setParameter('newsletter', $newsletter) ->getQuery() ->getOneOrNullResult(); // for phpstan because it detects mixed instead of entity return ($scheduledTask instanceof ScheduledTaskEntity) ? $scheduledTask : null; } public function findOneBySendingQueue(SendingQueueEntity $sendingQueue): ?ScheduledTaskEntity { $scheduledTask = $this->doctrineRepository->createQueryBuilder('st') ->join(SendingQueueEntity::class, 'sq', Join::WITH, 'st = sq.task') ->andWhere('sq.id = :sendingQueue') ->setMaxResults(1) ->setParameter('sendingQueue', $sendingQueue) ->getQuery() ->getOneOrNullResult(); // for phpstan because it detects mixed instead of entity return ($scheduledTask instanceof ScheduledTaskEntity) ? $scheduledTask : null; } /** * @param NewsletterEntity $newsletter * @return ScheduledTaskEntity[] */ public function findByScheduledAndRunningForNewsletter(NewsletterEntity $newsletter): array { return $this->doctrineRepository->createQueryBuilder('st') ->select('st') ->join(SendingQueueEntity::class, 'sq', Join::WITH, 'st = sq.task') ->andWhere('st.status = :status OR st.status IS NULL') ->andWhere('sq.newsletter = :newsletter') ->setParameter('status', NewsletterEntity::STATUS_SCHEDULED) ->setParameter('newsletter', $newsletter) ->getQuery() ->getResult(); } /** * @param NewsletterEntity $newsletter * @return ScheduledTaskEntity[] */ public function findByNewsletterAndSubscriberId(NewsletterEntity $newsletter, int $subscriberId): array { return $this->doctrineRepository->createQueryBuilder('st') ->select('st') ->join(SendingQueueEntity::class, 'sq', Join::WITH, 'st = sq.task') ->join(ScheduledTaskSubscriberEntity::class, 'sts', Join::WITH, 'st = sts.task') ->andWhere('sq.newsletter = :newsletter') ->andWhere('sts.subscriber = :subscriber') ->setParameter('newsletter', $newsletter) ->setParameter('subscriber', $subscriberId) ->getQuery() ->getResult(); } public function findOneScheduledByNewsletterAndSubscriber(NewsletterEntity $newsletter, SubscriberEntity $subscriber): ?ScheduledTaskEntity { $scheduledTask = $this->doctrineRepository->createQueryBuilder('st') ->join(SendingQueueEntity::class, 'sq', Join::WITH, 'st = sq.task') ->join(ScheduledTaskSubscriberEntity::class, 'sts', Join::WITH, 'st = sts.task') ->andWhere('st.status = :status') ->andWhere('sq.newsletter = :newsletter') ->andWhere('sts.subscriber = :subscriber') ->setMaxResults(1) ->setParameter('status', ScheduledTaskEntity::STATUS_SCHEDULED) ->setParameter('newsletter', $newsletter) ->setParameter('subscriber', $subscriber) ->getQuery() ->getOneOrNullResult(); // for phpstan because it detects mixed instead of entity return ($scheduledTask instanceof ScheduledTaskEntity) ? $scheduledTask : null; } public function findScheduledOrRunningTask(?string $type): ?ScheduledTaskEntity { $queryBuilder = $this->doctrineRepository->createQueryBuilder('st') ->select('st') ->where('((st.status = :scheduledStatus) OR (st.status is NULL))') ->andWhere('st.deletedAt IS NULL') ->setParameter('scheduledStatus', ScheduledTaskEntity::STATUS_SCHEDULED) ->setMaxResults(1) ->orderBy('st.scheduledAt', 'DESC'); if (!empty($type)) { $queryBuilder ->andWhere('st.type = :type') ->setParameter('type', $type); } return $queryBuilder->getQuery()->getOneOrNullResult(); } public function findScheduledTask(?string $type): ?ScheduledTaskEntity { $queryBuilder = $this->doctrineRepository->createQueryBuilder('st') ->select('st') ->where('st.status = :scheduledStatus') ->andWhere('st.deletedAt IS NULL') ->setParameter('scheduledStatus', ScheduledTaskEntity::STATUS_SCHEDULED) ->setMaxResults(1) ->orderBy('st.scheduledAt', 'DESC'); if (!empty($type)) { $queryBuilder ->andWhere('st.type = :type') ->setParameter('type', $type); } return $queryBuilder->getQuery()->getOneOrNullResult(); } public function findPreviousTask(ScheduledTaskEntity $task): ?ScheduledTaskEntity { return $this->doctrineRepository->createQueryBuilder('st') ->select('st') ->where('st.type = :type') ->setParameter('type', $task->getType()) ->andWhere('st.createdAt < :created') ->setParameter('created', $task->getCreatedAt()) ->orderBy('st.scheduledAt', 'DESC') ->setMaxResults(1) ->getQuery() ->getOneOrNullResult(); } public function findDueByType($type, $limit = null) { return $this->findByTypeAndStatus($type, ScheduledTaskEntity::STATUS_SCHEDULED, $limit); } public function findRunningByType($type, $limit = null) { return $this->findByTypeAndStatus($type, null, $limit); } public function findCompletedByType($type, $limit = null) { return $this->findByTypeAndStatus($type, ScheduledTaskEntity::STATUS_COMPLETED, $limit); } public function findFutureScheduledByType($type, $limit = null) { return $this->findByTypeAndStatus($type, ScheduledTaskEntity::STATUS_SCHEDULED, $limit, true); } public function getCountsPerStatus(string $type = 'sending') { $stats = [ ScheduledTaskEntity::STATUS_COMPLETED => 0, ScheduledTaskEntity::STATUS_PAUSED => 0, ScheduledTaskEntity::STATUS_SCHEDULED => 0, ScheduledTaskEntity::VIRTUAL_STATUS_RUNNING => 0, ]; $counts = $this->doctrineRepository->createQueryBuilder('st') ->select('COUNT(st.id) as value') ->addSelect('st.status') ->where('st.deletedAt IS NULL') ->andWhere('st.type = :type') ->setParameter('type', $type) ->addGroupBy('st.status') ->getQuery() ->getResult(); foreach ($counts as $count) { if ($count['status'] === null) { $stats[ScheduledTaskEntity::VIRTUAL_STATUS_RUNNING] = (int)$count['value']; continue; } $stats[$count['status']] = (int)$count['value']; } return $stats; } /** * @param string|null $type * @param array $statuses * @param int $limit * @return array<ScheduledTaskEntity> */ public function getLatestTasks( $type = null, $statuses = [ ScheduledTaskEntity::STATUS_COMPLETED, ScheduledTaskEntity::STATUS_SCHEDULED, ScheduledTaskEntity::VIRTUAL_STATUS_RUNNING, ], $limit = Scheduler::TASK_BATCH_SIZE ) { $result = []; foreach ($statuses as $status) { $tasksQuery = $this->doctrineRepository->createQueryBuilder('st') ->select('st') ->where('st.deletedAt IS NULL') ->where('st.status = :status'); if ($status === ScheduledTaskEntity::VIRTUAL_STATUS_RUNNING) { $tasksQuery = $tasksQuery->orWhere('st.status IS NULL'); } if ($type) { $tasksQuery = $tasksQuery->andWhere('st.type = :type') ->setParameter('type', $type); } $tasks = $tasksQuery ->setParameter('status', $status) ->setMaxResults($limit) ->orderBy('st.id', 'desc') ->getQuery() ->getResult(); $result = array_merge($result, $tasks); } return $result; } /** * @return ScheduledTaskEntity[] */ public function findRunningSendingTasks(?int $limit = null): array { return $this->doctrineRepository->createQueryBuilder('st') ->select('st') ->join('st.sendingQueue', 'sq') ->where('st.type = :type') ->andWhere('st.status IS NULL') ->andWhere('st.deletedAt IS NULL') ->orderBy('st.priority', 'ASC') ->addOrderBy('st.updatedAt', 'ASC') ->setMaxResults($limit) ->setParameter('type', SendingQueue::TASK_TYPE) ->getQuery() ->getResult(); } /** * @param string $type * @param SubscriberEntity $subscriber * @return ScheduledTaskEntity[] * @throws \MailPoetVendor\Doctrine\ORM\NonUniqueResultException */ public function findByTypeAndSubscriber(string $type, SubscriberEntity $subscriber): array { $query = $this->doctrineRepository->createQueryBuilder('st') ->select('st') ->join(ScheduledTaskSubscriberEntity::class, 'sts', Join::WITH, 'st = sts.task') ->where('st.type = :type') ->andWhere('sts.subscriber = :subscriber') ->andWhere('st.deletedAt IS NULL') ->andWhere('st.status = :status') ->setParameter('type', $type) ->setParameter('subscriber', $subscriber->getId()) ->setParameter('status', ScheduledTaskEntity::STATUS_SCHEDULED) ->getQuery(); $tasks = $query->getResult(); return $tasks; } public function touchAllByIds(array $ids): void { $now = CarbonImmutable::createFromTimestamp((int)$this->wp->currentTime('timestamp')); $this->entityManager->createQueryBuilder() ->update(ScheduledTaskEntity::class, 'st') ->set('st.updatedAt', ':updatedAt') ->setParameter('updatedAt', $now) ->where('st.id IN (:ids)') ->setParameter('ids', $ids, Connection::PARAM_INT_ARRAY) ->getQuery() ->execute(); // update was done via DQL, make sure the entities are also refreshed in the entity manager $this->refreshAll(function (ScheduledTaskEntity $entity) use ($ids) { return in_array($entity->getId(), $ids, true); }); } /** * @return ScheduledTaskEntity[] */ public function findScheduledSendingTasks(?int $limit = null): array { $now = Carbon::createFromTimestamp($this->wp->currentTime('timestamp')); return $this->doctrineRepository->createQueryBuilder('st') ->select('st') ->join('st.sendingQueue', 'sq') ->where('st.deletedAt IS NULL') ->andWhere('st.status = :status') ->andWhere('st.scheduledAt <= :now') ->andWhere('st.type = :type') ->orderBy('st.updatedAt', 'ASC') ->setMaxResults($limit) ->setParameter('status', ScheduledTaskEntity::STATUS_SCHEDULED) ->setParameter('now', $now) ->setParameter('type', SendingQueue::TASK_TYPE) ->getQuery() ->getResult(); } public function invalidateTask(ScheduledTaskEntity $task): void { $task->setStatus( ScheduledTaskEntity::STATUS_INVALID); $this->persist($task); $this->flush(); } /** @param int[] $ids */ public function deleteByIds(array $ids): void { $this->entityManager->createQueryBuilder() ->delete(ScheduledTaskEntity::class, 't') ->where('t.id IN (:ids)') ->setParameter('ids', $ids) ->getQuery() ->execute(); // delete was done via DQL, make sure the entities are also detached from the entity manager $this->detachAll(function (ScheduledTaskEntity $entity) use ($ids) { return in_array($entity->getId(), $ids, true); }); } protected function findByTypeAndStatus($type, $status, $limit = null, $future = false) { $queryBuilder = $this->doctrineRepository->createQueryBuilder('st') ->select('st') ->where('st.type = :type') ->setParameter('type', $type) ->andWhere('st.deletedAt IS NULL'); if (is_null($status)) { $queryBuilder->andWhere('st.status IS NULL'); } else { $queryBuilder ->andWhere('st.status = :status') ->setParameter('status', $status); } if ($future) { $queryBuilder->andWhere('st.scheduledAt > :now'); } else { $queryBuilder->andWhere('st.scheduledAt <= :now'); } $now = Carbon::createFromTimestamp($this->wp->currentTime('timestamp')); $queryBuilder->setParameter('now', $now); if ($limit) { $queryBuilder->setMaxResults($limit); } return $queryBuilder->getQuery()->getResult(); } protected function getEntityClassName() { return ScheduledTaskEntity::class; } } ScheduledTaskSubscribersRepository.php 0000644 00000022432 15073230147 0014314 0 ustar 00 <?php declare(strict_types = 1); namespace MailPoet\Newsletter\Sending; if (!defined('ABSPATH')) exit; use MailPoet\Doctrine\Repository; use MailPoet\Entities\ScheduledTaskEntity; use MailPoet\Entities\ScheduledTaskSubscriberEntity; use MailPoet\Entities\SubscriberEntity; use MailPoet\InvalidStateException; use MailPoet\WP\Functions as WPFunctions; use MailPoetVendor\Carbon\Carbon; use MailPoetVendor\Doctrine\DBAL\Connection; use MailPoetVendor\Doctrine\ORM\QueryBuilder; /** * @extends Repository<ScheduledTaskSubscriberEntity> */ class ScheduledTaskSubscribersRepository extends Repository { protected function getEntityClassName() { return ScheduledTaskSubscriberEntity::class; } public function isSubscriberProcessed(ScheduledTaskEntity $task, SubscriberEntity $subscriber): bool { $scheduledTaskSubscriber = $this ->doctrineRepository ->createQueryBuilder('sts') ->andWhere('sts.processed = 1') ->andWhere('sts.task = :task') ->andWhere('sts.subscriber = :subscriber') ->setParameter('subscriber', $subscriber) ->setParameter('task', $task) ->getQuery() ->getOneOrNullResult(); return !empty($scheduledTaskSubscriber); } public function createOrUpdate(array $data): ?ScheduledTaskSubscriberEntity { if (!isset($data['task_id'], $data['subscriber_id'])) { return null; } $taskSubscriber = $this->findOneBy(['task' => $data['task_id'], 'subscriber' => $data['subscriber_id']]); if (!$taskSubscriber) { $task = $this->entityManager->getReference(ScheduledTaskEntity::class, (int)$data['task_id']); $subscriber = $this->entityManager->getReference(SubscriberEntity::class, (int)$data['subscriber_id']); if (!$task || !$subscriber) throw new InvalidStateException(); $taskSubscriber = new ScheduledTaskSubscriberEntity($task, $subscriber); $this->persist($taskSubscriber); } $processed = $data['processed'] ?? ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED; $failed = $data['failed'] ?? ScheduledTaskSubscriberEntity::FAIL_STATUS_OK; $taskSubscriber->setProcessed($processed); $taskSubscriber->setFailed($failed); $this->flush(); return $taskSubscriber; } public function countSubscriberIdsBatchForTask(int $taskId, int $lastProcessedSubscriberId): int { $queryBuilder = $this->getBaseSubscribersIdsBatchForTaskQuery($taskId, $lastProcessedSubscriberId); $countSubscribers = $queryBuilder ->select('count(sts.subscriber)') ->getQuery() ->getSingleScalarResult(); return intval($countSubscribers); } public function getSubscriberIdsBatchForTask(int $taskId, int $lastProcessedSubscriberId, int $limit): array { $queryBuilder = $this->getBaseSubscribersIdsBatchForTaskQuery($taskId, $lastProcessedSubscriberId); $subscribersIds = $queryBuilder ->select('IDENTITY(sts.subscriber) AS subscriber_id') ->orderBy('sts.subscriber', 'asc') ->setMaxResults($limit) ->getQuery() ->getSingleColumnResult(); return $subscribersIds; } /** * @param int[] $subscriberIds */ public function updateProcessedSubscribers(ScheduledTaskEntity $task, array $subscriberIds): void { if ($subscriberIds) { $this->entityManager->createQueryBuilder() ->update(ScheduledTaskSubscriberEntity::class, 'sts') ->set('sts.processed', ScheduledTaskSubscriberEntity::STATUS_PROCESSED) ->where('sts.subscriber IN (:subscriberIds)') ->andWhere('sts.task = :task') ->setParameter('subscriberIds', $subscriberIds, Connection::PARAM_INT_ARRAY) ->setParameter('task', $task) ->getQuery() ->execute(); // update was done via DQL, make sure the entities are also refreshed in the entity manager $this->refreshAll(function (ScheduledTaskSubscriberEntity $entity) use ($task, $subscriberIds) { return $entity->getTask() === $task && in_array($entity->getSubscriberId(), $subscriberIds, true); }); } $this->checkCompleted($task); } public function createSubscribersForBounceWorker(ScheduledTaskEntity $scheduledTaskEntity): void { $scheduledTaskSubscribersTable = $this->entityManager->getClassMetadata(ScheduledTaskSubscriberEntity::class)->getTableName(); $subscribersTable = $this->entityManager->getClassMetadata(SubscriberEntity::class)->getTableName(); $stmt = $this->entityManager->getConnection()->prepare(" INSERT IGNORE INTO " . $scheduledTaskSubscribersTable . " (task_id, subscriber_id, processed) SELECT :taskId AS task_id, s.`id` AS subscriber_id, :unprocessed AS processed FROM " . $subscribersTable . " s WHERE s.`deleted_at` IS NULL AND s.`status` IN (:subscribed, :unconfirmed) "); $stmt->bindValue('taskId', $scheduledTaskEntity->getId()); $stmt->bindValue('unprocessed', ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED); $stmt->bindValue('subscribed', SubscriberEntity::STATUS_SUBSCRIBED); $stmt->bindValue('unconfirmed', SubscriberEntity::STATUS_UNCONFIRMED); $stmt->executeQuery(); } /** @param int[] $ids */ public function deleteByTaskIds(array $ids): void { $this->entityManager->createQueryBuilder() ->delete(ScheduledTaskSubscriberEntity::class, 'sts') ->where('sts.task IN (:taskIds)') ->setParameter('taskIds', $ids) ->getQuery() ->execute(); // delete was done via DQL, make sure the entities are also detached from the entity manager $this->detachAll(function (ScheduledTaskSubscriberEntity $entity) use ($ids) { $task = $entity->getTask(); return $task && in_array($task->getId(), $ids, true); }); } public function deleteByScheduledTask(ScheduledTaskEntity $scheduledTask): void { $this->entityManager->createQueryBuilder() ->delete(ScheduledTaskSubscriberEntity::class, 'sts') ->where('sts.task = :task') ->setParameter('task', $scheduledTask) ->getQuery() ->execute(); // delete was done via DQL, make sure the entities are also detached from the entity manager $this->detachAll(function (ScheduledTaskSubscriberEntity $entity) use ($scheduledTask) { return $entity->getTask() === $scheduledTask; }); } public function deleteByScheduledTaskAndSubscriberIds(ScheduledTaskEntity $scheduledTask, array $subscriberIds): void { $this->entityManager->createQueryBuilder() ->delete(ScheduledTaskSubscriberEntity::class, 'sts') ->where('sts.task = :task') ->andWhere('sts.subscriber IN (:subscriberIds)') ->setParameter('task', $scheduledTask) ->setParameter('subscriberIds', $subscriberIds, Connection::PARAM_INT_ARRAY) ->getQuery() ->execute(); // delete was done via DQL, make sure the entities are also detached from the entity manager $this->detachAll(function (ScheduledTaskSubscriberEntity $entity) use ($scheduledTask, $subscriberIds) { return $entity->getTask() === $scheduledTask && in_array($entity->getSubscriberId(), $subscriberIds, true); }); $this->checkCompleted($scheduledTask); } public function setSubscribers(ScheduledTaskEntity $task, array $subscriberIds): void { $this->deleteByScheduledTask($task); foreach ($subscriberIds as $subscriberId) { $this->createOrUpdate([ 'task_id' => $task->getId(), 'subscriber_id' => $subscriberId, ]); } } public function saveError(ScheduledTaskEntity $scheduledTask, int $subscriberId, string $errorMessage): void { $scheduledTaskSubscriber = $this->findOneBy(['task' => $scheduledTask, 'subscriber' => $subscriberId]); if ($scheduledTaskSubscriber instanceof ScheduledTaskSubscriberEntity) { $scheduledTaskSubscriber->setFailed(ScheduledTaskSubscriberEntity::FAIL_STATUS_FAILED); $scheduledTaskSubscriber->setProcessed(ScheduledTaskSubscriberEntity::STATUS_PROCESSED); $scheduledTaskSubscriber->setError($errorMessage); $this->persist($scheduledTaskSubscriber); $this->flush(); $this->checkCompleted($scheduledTask); } } public function countProcessed(ScheduledTaskEntity $scheduledTaskEntity): int { return $this->countBy(['task' => $scheduledTaskEntity, 'processed' => ScheduledTaskSubscriberEntity::STATUS_PROCESSED]); } public function countUnprocessed(ScheduledTaskEntity $scheduledTaskEntity): int { return $this->countBy(['task' => $scheduledTaskEntity, 'processed' => ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED]); } private function checkCompleted(ScheduledTaskEntity $task): void { $count = $this->countUnprocessed($task); if ($count === 0) { $task->setStatus(ScheduledTaskEntity::STATUS_COMPLETED); $task->setProcessedAt(Carbon::createFromTimestamp(WPFunctions::get()->currentTime('timestamp'))); $this->entityManager->flush(); } } private function getBaseSubscribersIdsBatchForTaskQuery(int $taskId, int $lastProcessedSubscriberId): QueryBuilder { return $this->entityManager ->createQueryBuilder() ->from(ScheduledTaskSubscriberEntity::class, 'sts') ->andWhere('sts.task = :taskId') ->andWhere('sts.subscriber > :lastProcessedSubscriberId') ->andWhere('sts.processed = :status') ->setParameter('taskId', $taskId) ->setParameter('lastProcessedSubscriberId', $lastProcessedSubscriberId) ->setParameter('status', ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED); } } SendingQueuesRepository.php 0000644 00000022412 15073230147 0012137 0 ustar 00 <?php declare(strict_types = 1); namespace MailPoet\Newsletter\Sending; if (!defined('ABSPATH')) exit; use MailPoet\Doctrine\Repository; use MailPoet\Entities\DynamicSegmentFilterEntity; use MailPoet\Entities\NewsletterEntity; use MailPoet\Entities\ScheduledTaskEntity; use MailPoet\Entities\SegmentEntity; use MailPoet\Entities\SendingQueueEntity; use MailPoet\Entities\SubscriberEntity; use MailPoet\Logging\LoggerFactory; use MailPoet\Segments\DynamicSegments\FilterFactory; use MailPoet\WP\Functions as WPFunctions; use MailPoetVendor\Carbon\Carbon; use MailPoetVendor\Doctrine\ORM\EntityManager; /** * @extends Repository<SendingQueueEntity> */ class SendingQueuesRepository extends Repository { /** @var ScheduledTaskSubscribersRepository */ private $scheduledTaskSubscribersRepository; /** @var WPFunctions */ private $wp; /** @var FilterFactory */ private $filterFactory; /** @var LoggerFactory */ private $loggerFactory; public function __construct( EntityManager $entityManager, WPFunctions $wp, ScheduledTaskSubscribersRepository $scheduledTaskSubscribersRepository, FilterFactory $filterFactory, LoggerFactory $loggerFactory ) { parent::__construct($entityManager); $this->scheduledTaskSubscribersRepository = $scheduledTaskSubscribersRepository; $this->wp = $wp; $this->filterFactory = $filterFactory; $this->loggerFactory = $loggerFactory; } protected function getEntityClassName() { return SendingQueueEntity::class; } /** * @param NewsletterEntity $newsletter * @param string|null $status * @return SendingQueueEntity|null * @throws \MailPoetVendor\Doctrine\ORM\NonUniqueResultException */ public function findOneByNewsletterAndTaskStatus(NewsletterEntity $newsletter, $status): ?SendingQueueEntity { $queryBuilder = $this->entityManager->createQueryBuilder() ->select('s') ->from(SendingQueueEntity::class, 's') ->join('s.task', 't') ->andWhere('s.newsletter = :newsletter') ->setParameter('newsletter', $newsletter); if (is_null($status)) { $queryBuilder->andWhere('t.status IS NULL'); } else { $queryBuilder->andWhere('t.status = :status') ->setParameter('status', $status); } return $queryBuilder->getQuery()->getOneOrNullResult(); } public function countAllByNewsletterAndTaskStatus(NewsletterEntity $newsletter, string $status): int { return intval($this->entityManager->createQueryBuilder() ->select('count(s.task)') ->from(SendingQueueEntity::class, 's') ->join('s.task', 't') ->where('t.status = :status') ->andWhere('s.newsletter = :newsletter') ->setParameter('status', $status) ->setParameter('newsletter', $newsletter) ->getQuery() ->getSingleScalarResult()); } public function getTaskIdsByNewsletterId(int $newsletterId): array { $results = $this->entityManager->createQueryBuilder() ->select('IDENTITY(s.task) as task_id') ->from(SendingQueueEntity::class, 's') ->andWhere('s.newsletter = :newsletter') ->setParameter('newsletter', $newsletterId) ->getQuery() ->getArrayResult(); return array_map('intval', array_column($results, 'task_id')); } public function isSubscriberProcessed(SendingQueueEntity $queue, SubscriberEntity $subscriber): bool { $task = $queue->getTask(); if (is_null($task)) return false; return $this->scheduledTaskSubscribersRepository->isSubscriberProcessed($task, $subscriber); } /** * @return SendingQueueEntity[] */ public function findAllForSubscriberSentBetween( SubscriberEntity $subscriber, ?\DateTimeInterface $dateTo, ?\DateTimeInterface $dateFrom ): array { $qb = $this->entityManager->createQueryBuilder() ->select('s, n') ->from(SendingQueueEntity::class, 's') ->join('s.task', 't') ->join('t.subscribers', 'tsub') ->join('s.newsletter', 'n') ->where('t.status = :status') ->setParameter('status', ScheduledTaskEntity::STATUS_COMPLETED) ->andWhere('t.type = :sendingType') ->setParameter('sendingType', 'sending') ->andWhere('tsub.subscriber = :subscriber') ->setParameter('subscriber', $subscriber); if ($dateTo) { $qb->andWhere('t.updatedAt < :dateTo') ->setParameter('dateTo', $dateTo); } if ($dateFrom) { $qb->andWhere('t.updatedAt > :dateFrom') ->setParameter('dateFrom', $dateFrom); } return $qb->getQuery()->getResult(); } public function pause(SendingQueueEntity $queue): void { if ($queue->getCountProcessed() !== $queue->getCountTotal()) { $task = $queue->getTask(); if ($task instanceof ScheduledTaskEntity) { $task->setStatus(ScheduledTaskEntity::STATUS_PAUSED); $this->flush(); } } } public function resume(SendingQueueEntity $queue): void { $task = $queue->getTask(); if (!$task instanceof ScheduledTaskEntity) return; if ($queue->getCountProcessed() === $queue->getCountTotal()) { $processedAt = Carbon::createFromTimestamp($this->wp->currentTime('timestamp')); $task->setProcessedAt($processedAt); $task->setStatus(ScheduledTaskEntity::STATUS_COMPLETED); // Update also status of newsletter if necessary $newsletter = $queue->getNewsletter(); if ($newsletter instanceof NewsletterEntity && $newsletter->canBeSetSent()) { $newsletter->setStatus(NewsletterEntity::STATUS_SENT); } $this->flush(); } else { $newsletter = $queue->getNewsletter(); if (!$newsletter instanceof NewsletterEntity) return; if ($newsletter->getStatus() === NewsletterEntity::STATUS_CORRUPT) { // force a re-render $queue->setNewsletterRenderedBody(null); $this->persist($queue); } $newsletter->setStatus(NewsletterEntity::STATUS_SENDING); $task->setStatus(null); $this->flush(); } } public function deleteByTask(ScheduledTaskEntity $scheduledTask): void { $this->entityManager->createQueryBuilder() ->delete(SendingQueueEntity::class, 'sq') ->where('sq.task = :task') ->setParameter('task', $scheduledTask) ->getQuery() ->execute(); // delete was done via DQL, make sure the entities are also detached from the entity manager $this->detachAll(function (SendingQueueEntity $entity) use ($scheduledTask) { return $entity->getTask() === $scheduledTask; }); } public function saveCampaignId(SendingQueueEntity $queue, string $campaignId): void { $meta = $queue->getMeta(); if (!is_array($meta)) { $meta = []; } $meta['campaignId'] = $campaignId; $queue->setMeta($meta); $this->flush(); } public function saveFilterSegmentMeta(SendingQueueEntity $queue, SegmentEntity $filterSegmentEntity): void { $meta = $queue->getMeta() ?? []; $meta['filterSegment'] = [ 'id' => $filterSegmentEntity->getId(), 'name' => $filterSegmentEntity->getName(), 'updatedAt' => $filterSegmentEntity->getUpdatedAt(), 'filters' => array_map(function(DynamicSegmentFilterEntity $filterEntity) { $filter = $this->filterFactory->getFilterForFilterEntity($filterEntity); $data = $filterEntity->getFilterData(); $filterData = [ 'filterType' => $data->getFilterType(), 'action' => $data->getAction(), 'data' => $filterEntity->getFilterData()->getData(), 'lookupData' => [], ]; try { $filterData['lookupData'] = $filter->getLookupData($data); } catch (\Throwable $e) { $this->loggerFactory->getLogger(LoggerFactory::TOPIC_SEGMENTS)->error("Failed to save lookup data for filter {$filterEntity->getId()}: {$e->getMessage()}"); } return $filterData; }, $filterSegmentEntity->getDynamicFilters()->toArray()), ]; $queue->setMeta($meta); $this->flush(); } public function updateCounts(SendingQueueEntity $queue, ?int $count = null): void { if ($count) { // increment/decrement counts based on known subscriber count, don't exceed the bounds $queue->setCountProcessed(min($queue->getCountProcessed() + $count, $queue->getCountTotal())); $queue->setCountToProcess(max($queue->getCountToProcess() - $count, 0)); } else { // query DB to update counts, slower but more accurate, to be used if count isn't known $task = $queue->getTask(); $processed = $task ? $this->scheduledTaskSubscribersRepository->countProcessed($task) : 0; $unprocessed = $task ? $this->scheduledTaskSubscribersRepository->countUnprocessed($task) : 0; $queue->setCountProcessed($processed); $queue->setCountToProcess($unprocessed); $queue->setCountTotal($processed + $unprocessed); } $this->entityManager->flush(); } /** @param int[] $ids */ public function deleteByNewsletterIds(array $ids): void { $this->entityManager->createQueryBuilder() ->delete(SendingQueueEntity::class, 'q') ->where('q.newsletter IN (:ids)') ->setParameter('ids', $ids) ->getQuery() ->execute(); // delete was done via DQL, make sure the entities are also detached from the entity manager $this->detachAll(function (SendingQueueEntity $entity) use ($ids) { $newsletter = $entity->getNewsletter(); return $newsletter && in_array($newsletter->getId(), $ids, true); }); } }
| ver. 1.4 |
Github
|
.
| PHP 7.4.33 | Generation time: 0 |
proxy
|
phpinfo
|
Settings